You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/08/15 13:44:14 UTC
[02/13] drill git commit: DRILL-5601: Rollup of external sort fixes
an improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
index 213720f..cd03b70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
@@ -19,7 +19,160 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import com.google.common.annotations.VisibleForTesting;
+/**
+ * Computes the memory needs for input batches, spill batches and merge
+ * batches. The key challenges that this code tries to overcome are:
+ * <ul>
+ * <li>Drill is not designed for the small memory allocations,
+ * but the planner may provide such allocations because the memory per
+ * query is divided among slices (minor fragments) and among buffering
+ * operators, leaving very little per operator.</li>
+ * <li>Drill does not provide the detailed memory information needed to
+ * carefully manage memory in tight constraints.</li>
+ * <li>But, Drill has a death penalty for going over the memory limit.</li>
+ * </ul>
+ * As a result, this class is a bit of a hack: it attempt to consider a
+ * number of ill-defined factors in order to divide up memory use in a
+ * way that prevents OOM errors.
+ * <p>
+ * First, it is necessary to differentiate two concepts:
+ * <ul>
+ * <li>The <i>data size</i> of a batch: the amount of memory needed to hold
+ * the data itself. The data size is constant for any given batch.</li>
+ * <li>The <i>buffer size</i> of the buffers that hold the data. The buffer
+ * size varies wildly depending on how the batch was produced.</li>
+ * </ul>
+ * The three kinds of buffer layouts seen to date include:
+ * <ul>
+ * <li>One buffer per vector component (data, offsets, null flags, etc.)
+ * – create by readers, project and other operators.</li>
+ * <li>One buffer for the entire batch, with each vector component using
+ * a slice of the overall buffer. – case for batches deserialized from
+ * exchanges.</li>
+ * <li>One buffer for each top-level vector, with component vectors
+ * using slices of the overall vector buffer – the result of reading
+ * spilled batches from disk.</li>
+ * </ul>
+ * In each case, buffer sizes are power-of-two rounded from the data size.
+ * But since the data is grouped differently in each case, the resulting buffer
+ * sizes vary considerably.
+ * <p>
+ * As a result, we can never be sure of the amount of memory needed for a
+ * batch. So, we have to estimate based on a number of factors:
+ * <ul>
+ * <li>Uses the {@link RecordBatchSizer} to estimate the data size and
+ * buffer size of each incoming batch.</li>
+ * <li>Estimates the internal fragmentation due to power-of-two rounding.</li>
+ * <li>Configured preferences for spill and output batches.</li>
+ * </ul>
+ * The code handles "normal" and "low" memory conditions.
+ * <ul>
+ * <li>In normal memory, we simply work out the number of preferred-size
+ * batches that fit in memory (based on the predicted buffer size.)</li>
+ * <li>In low memory, we divide up the available memory to produce the
+ * spill and merge batch sizes. The sizes will be less than the configured
+ * preference.</li>
+ * </ul>
+ * <p>
+ * The sort has two key configured parameters: the spill file size and the
+ * size of the output (downstream) batch. The spill file size is chosen to
+ * be large enough to ensure efficient I/O, but not so large as to overwhelm
+ * any one spill directory. The output batch size is chosen to be large enough
+ * to amortize the per-batch overhead over the maximum number of records, but
+ * not so large as to overwhelm downstream operators. Setting these parameters
+ * is a judgment call.
+ * <p>
+ * Under limited memory, the above sizes may be too big for the space available.
+ * For example, the default spill file size is 256 MB. But, if the sort is
+ * only given 50 MB, then spill files will be smaller. The default output batch
+ * size is 16 MB, but if the sort is given only 20 MB, then the output batch must
+ * be smaller. The low memory logic starts with the memory available and works
+ * backwards to figure out spill batch size, output batch size and spill file
+ * size. The sizes will be smaller than optimal, but as large as will fit in
+ * the memory provided.
+ */
+
public class SortMemoryManager {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+
+ /**
+ * Estimate for typical internal fragmentation in a buffer due to power-of-two
+ * rounding on vectors.
+ * <p>
+ * <p>
+ * <pre>[____|__$__]</pre>
+ * In the above, the brackets represent the whole vector. The
+ * first half is always full. The $ represents the end of data.
+ * When the first half filled, the second
+ * half was allocated. On average, the second half will be half full.
+ * This means that, on average, 1/4 of the allocated space is
+ * unused (the definition of internal fragmentation.)
+ */
+
+ public static final double INTERNAL_FRAGMENTATION_ESTIMATE = 1.0/4.0;
+
+ /**
+ * Given a buffer, this is the assumed amount of space
+ * available for data. (Adding more will double the buffer
+ * size half the time.)
+ */
+
+ public static final double PAYLOAD_FROM_BUFFER = 1 - INTERNAL_FRAGMENTATION_ESTIMATE;
+
+ /**
+ * Given a data size, this is the multiplier to create the buffer
+ * size estimate. (Note: since we work with aggregate batches, we
+ * cannot simply round up to the next power of two: rounding is done
+ * on a vector-by-vector basis. Here we need to estimate the aggregate
+ * effect of rounding.
+ */
+
+ public static final double BUFFER_FROM_PAYLOAD = 3.0 / 2.0;
+
+ /**
+ * On really bad days, we will add one more byte (or value) to a vector
+ * than fits in a power-of-two sized buffer, forcing a doubling. In this
+ * case, half the resulting buffer is empty.
+ */
+
+ public static final double WORST_CASE_BUFFER_RATIO = 2.0;
+
+ /**
+ * Desperate attempt to keep spill batches from being too small in low memory.
+ * <p>
+ * The number is also used for logging: the system will log a warning if
+ * batches fall below this number which may represent too little memory
+ * allocated for the job at hand. (Queries operate on big data: many records.
+ * Batches with too few records are a probable performance hit. But, what is
+ * too few? It is a judgment call.)
+ */
+
+ public static final int MIN_ROWS_PER_SORT_BATCH = 100;
+ public static final double LOW_MEMORY_MERGE_BATCH_RATIO = 0.25;
+
+ public static class BatchSizeEstimate {
+ int dataSize;
+ int expectedBufferSize;
+ int maxBufferSize;
+
+ public void setFromData(int dataSize) {
+ this.dataSize = dataSize;
+ expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD);
+ maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO);
+ }
+
+ public void setFromBuffer(int bufferSize) {
+ expectedBufferSize = bufferSize;
+ dataSize = multiply(bufferSize, PAYLOAD_FROM_BUFFER);
+ maxBufferSize = multiply(dataSize, WORST_CASE_BUFFER_RATIO);
+ }
+
+ public void setFromWorstCaseBuffer(int bufferSize) {
+ maxBufferSize = bufferSize;
+ dataSize = multiply(maxBufferSize, 1 / WORST_CASE_BUFFER_RATIO);
+ expectedBufferSize = multiply(dataSize, BUFFER_FROM_PAYLOAD);
+ }
+ }
/**
* Maximum memory this operator may use. Usually comes from the
@@ -42,13 +195,13 @@ public class SortMemoryManager {
* value.
*/
- private int expectedMergeBatchSize;
+ private final BatchSizeEstimate mergeBatchSize = new BatchSizeEstimate();
/**
* Estimate of the input batch size based on the largest batch seen
* thus far.
*/
- private int estimatedInputBatchSize;
+ private final BatchSizeEstimate inputBatchSize = new BatchSizeEstimate();
/**
* Maximum memory level before spilling occurs. That is, we can buffer input
@@ -86,7 +239,7 @@ public class SortMemoryManager {
* details of the data rows for any particular query.
*/
- private int expectedSpillBatchSize;
+ private final BatchSizeEstimate spillBatchSize = new BatchSizeEstimate();
/**
* The number of records to add to each output batch sent to the
@@ -97,24 +250,41 @@ public class SortMemoryManager {
private SortConfig config;
- private int estimatedInputSize;
-
private boolean potentialOverflow;
- public SortMemoryManager(SortConfig config, long memoryLimit) {
+ private boolean isLowMemory;
+
+ private boolean performanceWarning;
+
+ public SortMemoryManager(SortConfig config, long opMemoryLimit) {
this.config = config;
// The maximum memory this operator can use as set by the
// operator definition (propagated to the allocator.)
- if (config.maxMemory() > 0) {
- this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
- } else {
- this.memoryLimit = memoryLimit;
- }
+ final long configMemoryLimit = config.maxMemory();
+ memoryLimit = (configMemoryLimit == 0) ? opMemoryLimit
+ : Math.min(opMemoryLimit, configMemoryLimit);
preferredSpillBatchSize = config.spillBatchSize();;
preferredMergeBatchSize = config.mergeBatchSize();
+
+ // Initialize the buffer memory limit for the first batch.
+ // Assume 1/2 of (allocated - spill batch size).
+
+ bufferMemoryLimit = (memoryLimit - config.spillBatchSize()) / 2;
+ if (bufferMemoryLimit < 0) {
+ // Bad news: not enough for even the spill batch.
+ // Assume half of memory, will adjust later.
+ bufferMemoryLimit = memoryLimit / 2;
+ }
+
+ if (memoryLimit == opMemoryLimit) {
+ logger.debug("Memory config: Allocator limit = {}", memoryLimit);
+ } else {
+ logger.debug("Memory config: Allocator limit = {}, Configured limit: {}",
+ opMemoryLimit, memoryLimit);
+ }
}
/**
@@ -134,36 +304,39 @@ public class SortMemoryManager {
* phase, and how many spill batches we can merge during the merge
* phase.
*
- * @param batchSize the overall size of the current batch received from
+ * @param batchDataSize the overall size of the current batch received from
* upstream
* @param batchRowWidth the average width in bytes (including overhead) of
* rows in the current input batch
* @param batchRowCount the number of actual (not filtered) records in
* that upstream batch
+ * @return true if the estimates changed, false if the previous estimates
+ * remain valid
*/
- public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+ public boolean updateEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) {
// The record count should never be zero, but better safe than sorry...
if (batchRowCount == 0) {
- return; }
+ return false; }
// Update input batch estimates.
// Go no further if nothing changed.
- if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
- return;
+ if (! updateInputEstimates(batchDataSize, batchRowWidth, batchRowCount)) {
+ return false;
}
updateSpillSettings();
updateMergeSettings();
adjustForLowMemory();
logSettings(batchRowCount);
+ return true;
}
- private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+ private boolean updateInputEstimates(int batchDataSize, int batchRowWidth, int batchRowCount) {
// The row width may end up as zero if all fields are nulls or some
// other unusual situation. In this case, assume a width of 10 just
@@ -192,17 +365,13 @@ public class SortMemoryManager {
// batch. Because we are using the actual observed batch size,
// the size already includes overhead due to power-of-two rounding.
- long origInputBatchSize = estimatedInputBatchSize;
- estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
-
- // Estimate the total size of each incoming batch plus sv2. Note that, due
- // to power-of-two rounding, the allocated sv2 size might be twice the data size.
-
- estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
+ long origInputBatchSize = inputBatchSize.dataSize;
+ inputBatchSize.setFromData(Math.max(inputBatchSize.dataSize, batchDataSize));
// Return whether anything changed.
- return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize;
+ return estimatedRowWidth > origRowEstimate ||
+ inputBatchSize.dataSize > origInputBatchSize;
}
/**
@@ -215,18 +384,23 @@ public class SortMemoryManager {
spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
+ // But, don't allow spill batches to be too small; we pay too
+ // much overhead cost for small row counts.
+
+ spillBatchRowCount = Math.max(spillBatchRowCount, MIN_ROWS_PER_SORT_BATCH);
+
// Compute the actual spill batch size which may be larger or smaller
- // than the preferred size depending on the row width. Double the estimated
- // memory needs to allow for power-of-two rounding.
+ // than the preferred size depending on the row width.
- expectedSpillBatchSize = batchForRows(spillBatchRowCount);
+ spillBatchSize.setFromData(spillBatchRowCount * estimatedRowWidth);
// Determine the minimum memory needed for spilling. Spilling is done just
// before accepting a spill batch, so we must spill if we don't have room for a
// (worst case) input batch. To spill, we need room for the spill batch created
- // by merging the batches already in memory.
+ // by merging the batches already in memory. This is a memory calculation,
+ // so use the buffer size for the spill batch.
- bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+ bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize;
}
/**
@@ -238,13 +412,21 @@ public class SortMemoryManager {
private void updateMergeSettings() {
mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
- expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
+
+ // But, don't allow merge batches to be too small; we pay too
+ // much overhead cost for small row counts.
+
+ mergeBatchRowCount = Math.max(mergeBatchRowCount, MIN_ROWS_PER_SORT_BATCH);
+
+ // Compute the actual merge batch size.
+
+ mergeBatchSize.setFromData(mergeBatchRowCount * estimatedRowWidth);
// The merge memory pool assumes we can spill all input batches. The memory
// available to hold spill batches for merging is total memory minus the
// expected output batch size.
- mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ mergeMemoryLimit = memoryLimit - mergeBatchSize.maxBufferSize;
}
/**
@@ -271,22 +453,27 @@ public class SortMemoryManager {
private void adjustForLowMemory() {
- long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
- long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
- if (loadHeadroom >= 0 && mergeHeadroom >= 0) {
- return;
- }
+ potentialOverflow = false;
+ performanceWarning = false;
- lowMemorySpillBatchSize();
- lowMemoryMergeBatchSize();
+ // Input batches are assumed to have typical fragmentation. Experience
+ // shows that spilled batches have close to the maximum fragmentation.
+
+ long loadHeadroom = bufferMemoryLimit - 2 * inputBatchSize.expectedBufferSize;
+ long mergeHeadroom = mergeMemoryLimit - 2 * spillBatchSize.maxBufferSize;
+ isLowMemory = (loadHeadroom < 0 | mergeHeadroom < 0);
+ if (! isLowMemory) {
+ return; }
+
+ lowMemoryInternalBatchSizes();
// Sanity check: if we've been given too little memory to make progress,
// issue a warning but proceed anyway. Should only occur if something is
// configured terribly wrong.
- long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
+ long minNeeds = 2 * inputBatchSize.expectedBufferSize + spillBatchSize.maxBufferSize;
if (minNeeds > memoryLimit) {
- ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " +
+ logger.warn("Potential memory overflow during load phase! " +
"Minimum needed = {} bytes, actual available = {} bytes",
minNeeds, memoryLimit);
bufferMemoryLimit = 0;
@@ -295,14 +482,36 @@ public class SortMemoryManager {
// Sanity check
- minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
+ minNeeds = 2 * spillBatchSize.expectedBufferSize + mergeBatchSize.expectedBufferSize;
if (minNeeds > memoryLimit) {
- ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " +
+ logger.warn("Potential memory overflow during merge phase! " +
"Minimum needed = {} bytes, actual available = {} bytes",
minNeeds, memoryLimit);
mergeMemoryLimit = 0;
potentialOverflow = true;
}
+
+ // Performance warning
+
+ if (potentialOverflow) {
+ return;
+ }
+ if (spillBatchSize.dataSize < config.spillBatchSize() &&
+ spillBatchRowCount < Character.MAX_VALUE) {
+ logger.warn("Potential performance degredation due to low memory. " +
+ "Preferred spill batch size: {}, actual: {}, rows per batch: {}",
+ config.spillBatchSize(), spillBatchSize.dataSize,
+ spillBatchRowCount);
+ performanceWarning = true;
+ }
+ if (mergeBatchSize.dataSize < config.mergeBatchSize() &&
+ mergeBatchRowCount < Character.MAX_VALUE) {
+ logger.warn("Potential performance degredation due to low memory. " +
+ "Preferred merge batch size: {}, actual: {}, rows per batch: {}",
+ config.mergeBatchSize(), mergeBatchSize.dataSize,
+ mergeBatchRowCount);
+ performanceWarning = true;
+ }
}
/**
@@ -312,52 +521,66 @@ public class SortMemoryManager {
* one spill batch to make progress.
*/
- private void lowMemorySpillBatchSize() {
+ private void lowMemoryInternalBatchSizes() {
// The "expected" size is with power-of-two rounding in some vectors.
// We later work backwards to the row count assuming average internal
// fragmentation.
- // Must hold two input batches. Use (most of) the rest for the spill batch.
+ // Must hold two input batches. Use half of the rest for the spill batch.
+ // In a really bad case, the number here may be negative. We'll fix
+ // it below.
- expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize);
+ int spillBufferSize = (int) (memoryLimit - 2 * inputBatchSize.maxBufferSize) / 2;
// But, in the merge phase, we need two spill batches and one output batch.
// (Assume that the spill and merge are equal sizes.)
- // Use 3/4 of memory for each batch (to allow power-of-two rounding:
- expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize, memoryLimit/3);
+ spillBufferSize = (int) Math.min(spillBufferSize, memoryLimit/4);
- // Never going to happen, but let's ensure we don't somehow create large batches.
+ // Compute the size from the buffer. Assume worst-case
+ // fragmentation (as is typical when reading from the spill file.)
- expectedSpillBatchSize = Math.max(expectedSpillBatchSize, SortConfig.MIN_SPILL_BATCH_SIZE);
+ spillBatchSize.setFromWorstCaseBuffer(spillBufferSize);
// Must hold at least one row to spill. That is, we can make progress if we
// create spill files that consist of single-record batches.
- expectedSpillBatchSize = Math.max(expectedSpillBatchSize, estimatedRowWidth);
+ int spillDataSize = Math.min(spillBatchSize.dataSize, config.spillBatchSize());
+ spillDataSize = Math.max(spillDataSize, estimatedRowWidth);
+ if (spillDataSize != spillBatchSize.dataSize) {
+ spillBatchSize.setFromData(spillDataSize);
+ }
// Work out the spill batch count needed by the spill code. Allow room for
// power-of-two rounding.
- spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize);
+ spillBatchRowCount = rowsPerBatch(spillBatchSize.dataSize);
// Finally, figure out when we must spill.
- bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
- }
+ bufferMemoryLimit = memoryLimit - 2 * spillBatchSize.maxBufferSize;
+ bufferMemoryLimit = Math.max(bufferMemoryLimit, 0);
- /**
- * For merge batch, we must hold at least two spill batches and
- * one output batch.
- */
+ // Assume two spill batches must be merged (plus safety margin.)
+ // The rest can be give to the merge batch.
+
+ long mergeBufferSize = memoryLimit - 2 * spillBatchSize.maxBufferSize;
+
+ // The above calcs assume that the merge batch size is the same as
+ // the spill batch size (the division by three.)
+ // For merge batch, we must hold at least two spill batches and
+ // one output batch, which is why we assumed 3 spill batches.
- private void lowMemoryMergeBatchSize() {
- expectedMergeBatchSize = (int) (memoryLimit - 2 * expectedSpillBatchSize);
- expectedMergeBatchSize = Math.max(expectedMergeBatchSize, SortConfig.MIN_MERGE_BATCH_SIZE);
- expectedMergeBatchSize = Math.max(expectedMergeBatchSize, estimatedRowWidth);
- mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize);
- mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ mergeBatchSize.setFromBuffer((int) mergeBufferSize);
+ int mergeDataSize = Math.min(mergeBatchSize.dataSize, config.mergeBatchSize());
+ mergeDataSize = Math.max(mergeDataSize, estimatedRowWidth);
+ if (mergeDataSize != mergeBatchSize.dataSize) {
+ mergeBatchSize.setFromData(spillDataSize);
+ }
+
+ mergeBatchRowCount = rowsPerBatch(mergeBatchSize.dataSize);
+ mergeMemoryLimit = Math.max(2 * spillBatchSize.expectedBufferSize, memoryLimit - mergeBatchSize.maxBufferSize);
}
/**
@@ -367,14 +590,34 @@ public class SortMemoryManager {
private void logSettings(int actualRecordCount) {
- ExternalSortBatch.logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
- estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
- ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
- expectedSpillBatchSize, spillBatchRowCount, config.spillFileSize());
- ExternalSortBatch.logger.debug("Output batch size = {} bytes, {} records",
- expectedMergeBatchSize, mergeBatchRowCount);
- ExternalSortBatch.logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
+ logger.debug("Input Batch Estimates: record size = {} bytes; net = {} bytes, gross = {}, records = {}",
+ estimatedRowWidth, inputBatchSize.dataSize,
+ inputBatchSize.expectedBufferSize, actualRecordCount);
+ logger.debug("Spill batch size: net = {} bytes, gross = {} bytes, records = {}; spill file = {} bytes",
+ spillBatchSize.dataSize, spillBatchSize.expectedBufferSize,
+ spillBatchRowCount, config.spillFileSize());
+ logger.debug("Output batch size: net = {} bytes, gross = {} bytes, records = {}",
+ mergeBatchSize.dataSize, mergeBatchSize.expectedBufferSize,
+ mergeBatchRowCount);
+ logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
memoryLimit, bufferMemoryLimit, mergeMemoryLimit);
+
+ // Performance warnings due to low row counts per batch.
+ // Low row counts cause excessive per-batch overhead and hurt
+ // performance.
+
+ if (spillBatchRowCount < MIN_ROWS_PER_SORT_BATCH) {
+ logger.warn("Potential performance degredation due to low memory or large input row. " +
+ "Preferred spill batch row count: {}, actual: {}",
+ MIN_ROWS_PER_SORT_BATCH, spillBatchRowCount);
+ performanceWarning = true;
+ }
+ if (mergeBatchRowCount < MIN_ROWS_PER_SORT_BATCH) {
+ logger.warn("Potential performance degredation due to low memory or large input row. " +
+ "Preferred merge batch row count: {}, actual: {}",
+ MIN_ROWS_PER_SORT_BATCH, mergeBatchRowCount);
+ performanceWarning = true;
+ }
}
public enum MergeAction { SPILL, MERGE, NONE }
@@ -389,86 +632,120 @@ public class SortMemoryManager {
}
}
+ /**
+ * Choose a consolidation option during the merge phase depending on memory
+ * available. Preference is given to moving directly onto merging (with no
+ * additional spilling) when possible. But, if memory pressures don't allow
+ * this, we must spill batches and/or merge on-disk spilled runs, to reduce
+ * the final set of runs to something that can be merged in the available
+ * memory.
+ * <p>
+ * Logic is here (returning an enum) rather than in the merge code to allow
+ * unit testing without actually needing batches in memory.
+ *
+ * @param allocMemory
+ * amount of memory currently allocated (this class knows the total
+ * memory available)
+ * @param inMemCount
+ * number of incoming batches in memory (the number is important, not
+ * the in-memory size; we get the memory size from
+ * <tt>allocMemory</tt>)
+ * @param spilledRunsCount
+ * the number of runs sitting on disk to be merged
+ * @return whether to <tt>SPILL</tt> in-memory batches, whether to
+ * <tt>MERGE<tt> on-disk batches to create a new, larger run, or whether
+ * to do nothing (<tt>NONE</tt>) and instead advance to the final merge
+ */
+
public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) {
- // Determine additional memory needed to hold one batch from each
- // spilled run.
+ assert allocMemory == 0 || inMemCount > 0;
+ assert inMemCount + spilledRunsCount > 0;
- // If the on-disk batches and in-memory batches need more memory than
- // is available, spill some in-memory batches.
+ // If only one spilled run, then merging is not productive regardless
+ // of memory limits.
- if (inMemCount > 0) {
- long mergeSize = spilledRunsCount * expectedSpillBatchSize;
- if (allocMemory + mergeSize > mergeMemoryLimit) {
- return new MergeTask(MergeAction.SPILL, 0);
- }
+ if (inMemCount == 0 && spilledRunsCount <= 1) {
+ return new MergeTask(MergeAction.NONE, 0);
}
- // Maximum batches that fit into available memory.
+ // If memory is above the merge memory limit, then must spill
+ // merge to create room for a merge batch.
- int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) / expectedSpillBatchSize);
+ if (allocMemory > mergeMemoryLimit) {
+ return new MergeTask(MergeAction.SPILL, 0);
+ }
- // Can't merge more than the merge limit.
+ // Determine additional memory needed to hold one batch from each
+ // spilled run.
+
+ // Maximum spill batches that fit into available memory.
- mergeLimit = Math.min(mergeLimit, config.mergeLimit());
+ int memMergeLimit = (int) ((mergeMemoryLimit - allocMemory) /
+ spillBatchSize.expectedBufferSize);
+ memMergeLimit = Math.max(0, memMergeLimit);
- // How many batches to merge?
+ // If batches are in memory, and we need more memory to merge
+ // them all than is actually available, then spill some in-memory
+ // batches.
+
+ if (inMemCount > 0 && memMergeLimit < spilledRunsCount) {
+ return new MergeTask(MergeAction.SPILL, 0);
+ }
- int mergeCount = spilledRunsCount - mergeLimit;
- if (mergeCount <= 0) {
+ // If all batches fit in memory, then no need for a second-generation
+ // merge/spill.
+
+ memMergeLimit = Math.min(memMergeLimit, config.mergeLimit());
+ int mergeRunCount = spilledRunsCount - memMergeLimit;
+ if (mergeRunCount <= 0) {
return new MergeTask(MergeAction.NONE, 0);
}
- // We will merge. This will create yet another spilled
- // run. Account for that.
+ // We need a second generation load-merge-spill cycle
+ // to reduce the number of spilled runs to a smaller set
+ // that will fit in memory.
+
+ // Merging creates another batch. Include one more run
+ // in the merge to create space for the new run.
+
+ mergeRunCount += 1;
- mergeCount += 1;
+ // Merge only as many batches as fit in memory.
+ // Use all memory for this process; no need to reserve space for a
+ // merge output batch. Assume worst case since we are forced to
+ // accept spilled batches blind: we can't limit reading based on memory
+ // limits. Subtract one to allow for the output spill batch.
+
+ memMergeLimit = (int)(memoryLimit / spillBatchSize.maxBufferSize) - 1;
+ mergeRunCount = Math.min(mergeRunCount, memMergeLimit);
// Must merge at least 2 batches to make progress.
- // This is the the (at least one) excess plus the allowance
- // above for the new one.
+ // We know we have at least two because of the check done above.
- // Can't merge more than the limit.
+ mergeRunCount = Math.max(mergeRunCount, 2);
- mergeCount = Math.min(mergeCount, config.mergeLimit());
+ // Can't merge more than the merge limit.
- // Do the merge, then loop to try again in case not
- // all the target batches spilled in one go.
+ mergeRunCount = Math.min(mergeRunCount, config.mergeLimit());
- return new MergeTask(MergeAction.MERGE, mergeCount);
+ return new MergeTask(MergeAction.MERGE, mergeRunCount);
}
/**
- * Compute the number of rows per batch assuming that the batch is
- * subject to average internal fragmentation due to power-of-two
- * rounding on vectors.
- * <p>
- * <pre>[____|__$__]</pre>
- * In the above, the brackets represent the whole vector. The
- * first half is always full. When the first half filled, the second
- * half was allocated. On average, the second half will be half full.
+ * Compute the number of rows that fit into a given batch data size.
*
* @param batchSize expected batch size, including internal fragmentation
* @return number of rows that fit into the batch
*/
private int rowsPerBatch(int batchSize) {
- int rowCount = batchSize * 3 / 4 / estimatedRowWidth;
+ int rowCount = batchSize / estimatedRowWidth;
return Math.max(1, Math.min(rowCount, Character.MAX_VALUE));
}
- /**
- * Compute the expected number of rows that fit into a given size
- * batch, accounting for internal fragmentation due to power-of-two
- * rounding on vector allocations.
- *
- * @param rowCount the desired number of rows in the batch
- * @return the size of resulting batch, including power-of-two
- * rounding.
- */
-
- private int batchForRows(int rowCount) {
- return estimatedRowWidth * rowCount * 4 / 3;
+ public static int multiply(int byteSize, double multiplier) {
+ return (int) Math.floor(byteSize * multiplier);
}
// Must spill if we are below the spill point (the amount of memory
@@ -497,17 +774,21 @@ public class SortMemoryManager {
@VisibleForTesting
public int getRowWidth() { return estimatedRowWidth; }
@VisibleForTesting
- public int getInputBatchSize() { return estimatedInputBatchSize; }
+ public BatchSizeEstimate getInputBatchSize() { return inputBatchSize; }
@VisibleForTesting
public int getPreferredSpillBatchSize() { return preferredSpillBatchSize; }
@VisibleForTesting
public int getPreferredMergeBatchSize() { return preferredMergeBatchSize; }
@VisibleForTesting
- public int getSpillBatchSize() { return expectedSpillBatchSize; }
+ public BatchSizeEstimate getSpillBatchSize() { return spillBatchSize; }
@VisibleForTesting
- public int getMergeBatchSize() { return expectedMergeBatchSize; }
+ public BatchSizeEstimate getMergeBatchSize() { return mergeBatchSize; }
@VisibleForTesting
public long getBufferMemoryLimit() { return bufferMemoryLimit; }
@VisibleForTesting
public boolean mayOverflow() { return potentialOverflow; }
+ @VisibleForTesting
+ public boolean isLowMemory() { return isLowMemory; }
+ @VisibleForTesting
+ public boolean hasPerformanceWarning() { return performanceWarning; }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
index 4231cf4..0f27884 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -83,10 +83,9 @@ public class SorterWrapper extends BaseSortWrapper {
ClassGenerator<SingleBatchSorter> g = cg.getRoot();
cg.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
- cg.saveCodeForDebugging(true);
+// cg.saveCodeForDebugging(true);
generateComparisons(g, batch, logger);
return getInstance(cg, logger);
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index a6042c6..b75ce77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorContainer;
import com.google.common.collect.Lists;
@@ -86,13 +87,14 @@ public class SpilledRuns {
return batchesToSpill;
}
- public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount) {
- spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount));
+ public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
+ spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper));
logger.trace("Completed spill: memory = {}",
context.getAllocator().getAllocatedMemory());
}
- public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount) {
+ public void mergeRuns(int targetCount, long mergeMemoryPool,
+ int spillBatchRowCount, VectorInitializer allocHelper) {
long allocated = context.getAllocator().getAllocatedMemory();
mergeMemoryPool -= context.getAllocator().getAllocatedMemory();
@@ -128,12 +130,12 @@ public class SpilledRuns {
// Do the actual spill.
List<BatchGroup> batchesToSpill = prepareSpillBatches(spilledRuns, mergeCount);
- mergeAndSpill(batchesToSpill, spillBatchRowCount);
+ mergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
}
- private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) {
+ private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount, VectorInitializer allocHelper) {
try {
- return doMergeAndSpill(batchesToSpill, spillBatchRowCount);
+ return doMergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper);
}
// If error is a User Exception, just use as is.
@@ -145,7 +147,8 @@ public class SpilledRuns {
}
}
- private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) throws Throwable {
+ private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill,
+ int spillBatchRowCount, VectorInitializer allocHelper) throws Throwable {
// Merge the selected set of matches and write them to the
// spill file. After each write, we release the memory associated
@@ -155,7 +158,8 @@ public class SpilledRuns {
BatchGroup.SpilledRun newGroup = null;
VectorContainer dest = new VectorContainer();
try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
- PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, dest, spillBatchRowCount)) {
+ PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill,
+ dest, spillBatchRowCount, allocHelper)) {
newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, context.getAllocator());
logger.trace("Spilling {} batches, into spill batches of {} rows, to {}",
batchesToSpill.size(), spillBatchRowCount, outputFile);
@@ -175,9 +179,9 @@ public class SpilledRuns {
}
context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
newGroup.closeOutputStream();
- logger.trace("Spilled {} output batches, each of {} by bytes, {} records to {}",
- merger.getBatchCount(), merger.getRecordCount(),
- merger.getEstBatchSize(), outputFile);
+ logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}",
+ merger.getBatchCount(), merger.getEstBatchSize(),
+ spillBatchRowCount, outputFile);
newGroup.setBatchSize(merger.getEstBatchSize());
return newGroup;
} catch (Throwable e) {
@@ -192,7 +196,8 @@ public class SpilledRuns {
}
}
- public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount) {
+ public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches,
+ VectorContainer container, int mergeRowCount, VectorInitializer allocHelper) {
List<BatchGroup> allBatches = new LinkedList<>();
allBatches.addAll(bufferedBatches);
bufferedBatches.clear();
@@ -200,7 +205,7 @@ public class SpilledRuns {
spilledRuns.clear();
logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
allBatches.size(), context.getAllocator().getAllocatedMemory());
- return copierHolder.startMerge(schema, allBatches, container, mergeRowCount);
+ return copierHolder.startMerge(schema, allBatches, container, mergeRowCount, allocHelper);
}
public void close() {
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index ca275c7..eb90614 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -117,17 +117,19 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return IterOutcome.STOP;
}
next = b.next();
- }finally{
+ } finally {
stats.startProcessing();
}
- switch(next){
+ switch(next) {
case OK_NEW_SCHEMA:
stats.batchReceived(inputIndex, b.getRecordCount(), true);
break;
case OK:
stats.batchReceived(inputIndex, b.getRecordCount(), false);
break;
+ default:
+ break;
}
return next;
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 44c6b1a..4e47051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -81,6 +81,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
}
}
+ @SuppressWarnings("resource")
@Override
public VectorWrapper<?> getChildWrapper(int[] ids) {
if (ids.length == 1) {
@@ -105,6 +106,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors);
}
+ @SuppressWarnings("resource")
@Override
public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
ValueVector v = vectors[0];
@@ -112,7 +114,6 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
}
@Override
- @SuppressWarnings("unchecked")
public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
return new HyperVectorWrapper<T>(f, vectors, false);
// T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
@@ -128,12 +129,14 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
return new HyperVectorWrapper<T>(f, v, releasable);
}
+ @SuppressWarnings("unchecked")
public void addVector(ValueVector v) {
Preconditions.checkArgument(v.getClass() == this.getVectorClass(), String.format("Cannot add vector type %s to hypervector type %s for field %s",
v.getClass(), this.getVectorClass(), v.getField()));
vectors = (T[]) ArrayUtils.add(vectors, v);// TODO optimize this so not copying every time
}
+ @SuppressWarnings("unchecked")
public void addVectors(ValueVector[] vv) {
vectors = (T[]) ArrayUtils.add(vectors, vv);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 0daa6b3..b4ae2d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,8 +19,6 @@ package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
/**
* A record batch contains a set of field values for a particular range of
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
new file mode 100644
index 0000000..5dd348e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorInitializer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+/**
+ * Prototype mechanism to allocate vectors based on expected
+ * data sizes. This version uses a name-based map of fields
+ * to sizes. Better to represent the batch structurally and
+ * simply iterate over the schema rather than doing a per-field
+ * lookup. But, the mechanisms needed to do the efficient solution
+ * don't exist yet.
+ */
+
+public class VectorInitializer {
+
+ private static class AllocationHint {
+ public final int entryWidth;
+ public final int elementCount;
+
+ private AllocationHint(int width, int elements) {
+ entryWidth = width;
+ elementCount = elements;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder()
+ .append("{");
+ String sep = "";
+ if (entryWidth > 0) {
+ buf.append("width=")
+ .append(entryWidth);
+ sep = ", ";
+ }
+ if (elementCount > 0) {
+ buf.append(sep)
+ .append("elements=")
+ .append(elementCount);
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+ }
+
+ private Map<String, AllocationHint> hints = new HashMap<>();
+
+ public void variableWidth(String name, int width) {
+ hints.put(name, new AllocationHint(width, 1));
+ }
+
+ public void fixedWidthArray(String name, int elements) {
+ hints.put(name, new AllocationHint(0, elements));
+ }
+
+ public void variableWidthArray(String name, int width, int elements) {
+ hints.put(name, new AllocationHint(width, elements));
+ }
+
+ public void allocateBatch(VectorAccessible va, int recordCount) {
+ for (VectorWrapper<?> w: va) {
+ allocateVector(w.getValueVector(), "", recordCount);
+ }
+ }
+
+ private void allocateVector(ValueVector vector, String prefix, int recordCount) {
+ String key = prefix + vector.getField().getName();
+ AllocationHint hint = hints.get(key);
+ if (vector instanceof AbstractMapVector) {
+ allocateMap((AbstractMapVector) vector, prefix, recordCount, hint);
+ } else {
+ allocateVector(vector, recordCount, hint);
+ }
+// Set<BufferLedger> ledgers = new HashSet<>();
+// vector.getLedgers(ledgers);
+// int size = 0;
+// for (BufferLedger ledger : ledgers) {
+// size += ledger.getAccountedSize();
+// }
+// System.out.println(key + ": " + vector.getField().toString() +
+// " " +
+// ((hint == null) ? "no hint" : hint.toString()) +
+// ", " + size);
+ }
+
+ private void allocateVector(ValueVector vector, int recordCount, AllocationHint hint) {
+ if (hint == null) {
+ // Use hard-coded values. Same as ScanBatch
+
+ AllocationHelper.allocate(vector, recordCount, 50, 10);
+ } else {
+ AllocationHelper.allocate(vector, recordCount, hint.entryWidth, hint.elementCount);
+ }
+ }
+
+ private void allocateMap(AbstractMapVector map, String prefix, int recordCount, AllocationHint hint) {
+ if (map instanceof RepeatedMapVector) {
+ ((RepeatedMapVector) map).allocateOffsetsNew(recordCount);
+ if (hint == null) {
+ recordCount *= 10;
+ } else {
+ recordCount *= hint.elementCount;
+ }
+ }
+ prefix += map.getField().getName() + ".";
+ for (ValueVector vector : map) {
+ allocateVector(vector, prefix, recordCount);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("[" + getClass().getSimpleName())
+ .append(" ");
+ boolean first = true;
+ for (Entry<String, AllocationHint>entry : hints.entrySet()) {
+ if (! first) {
+ buf.append(", ");
+ }
+ first = false;
+ buf.append("[")
+ .append(entry.getKey())
+ .append(" ")
+ .append(entry.getValue().toString())
+ .append("]");
+ }
+ buf.append("]");
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index bcec920..b3b46c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -80,6 +80,7 @@ public class WritableBatch implements AutoCloseable {
len += b.capacity();
}
+ @SuppressWarnings("resource")
DrillBuf newBuf = allocator.buffer(len);
try {
/* Copy data from each buffer into the compound buffer */
@@ -101,7 +102,9 @@ public class WritableBatch implements AutoCloseable {
for (VectorWrapper<?> vv : container) {
SerializedField fmd = fields.get(vectorIndex);
+ @SuppressWarnings("resource")
ValueVector v = vv.getValueVector();
+ @SuppressWarnings("resource")
DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
v.load(fmd, bb);
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 0d341df..7ed9220 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -293,6 +293,7 @@ public class Drillbit implements AutoCloseable {
return start(config, null);
}
+ @SuppressWarnings("resource")
public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
throws DrillbitStartupException {
logger.debug("Starting new Drillbit.");
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 437862e..41ecc95 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -251,8 +251,6 @@ drill.exec: {
external: {
// Drill uses the managed External Sort Batch by default.
// Set this to true to use the legacy, unmanaged version.
- // Disabled in the intial commit, to be enabled after
- // tests are committed.
disable_managed: true,
// Limit on the number of batches buffered in memory.
// Primarily for testing.
@@ -282,9 +280,9 @@ drill.exec: {
directories: ${drill.exec.spill.directories},
// Size of the batches written to, and read from, the spill files.
// Determines the ratio of memory to input data size for a single-
- // generation sort. Smaller values give larger ratios, but at a
- // (high) cost of much greater disk seek times.
- spill_batch_size = 8M,
+ // generation sort. Smaller values are better, but too small
+ // incurs per-batch overhead.
+ spill_batch_size = 1M,
// Preferred file size for "first-generation" spill files.
// Set large enough to get long, continuous writes, but not so
// large as to overwhelm a temp directory.
@@ -292,7 +290,8 @@ drill.exec: {
file_size: 256M,
// Size of the batch sent downstream from the sort operator during
// the merge phase. Don't change this unless you know what you are doing,
- // larger sizes can result in memory fragmentation.
+ // larger sizes can result in memory fragmentation, smaller sizes
+ // in excessive operator iterator overhead.
merge_batch_size = 16M
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 7700a1e..ee350ce 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -31,7 +31,6 @@ import org.junit.Test;
import java.util.List;
public class TestUnionAll extends BaseTestQuery{
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestUnionAll.class);
private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index 05670c5..cfb8645 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -17,9 +17,6 @@
*/
package org.apache.drill.exec.cache;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -119,8 +116,6 @@ public class TestBatchSerialization extends DrillTest {
*/
private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
- long origSize = rowSet.size();
-
File dir = OperatorFixture.getTempDir("serial");
File outFile = new File(dir, "serialze.dat");
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
@@ -135,7 +130,6 @@ public class TestBatchSerialization extends DrillTest {
.read());
}
- assertTrue(origSize >= result.size());
new RowSetComparison(expected)
.verifyAndClearAll(result);
outFile.delete();
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 76f0935..73f9b6d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -440,6 +440,11 @@ public class TestWindowFrame extends BaseTestQuery {
.go();
}
+ // Note: This test is unstable. It works when forcing the merge/sort batch
+ // size to 20, but not for other sizes. The problem is either that the results
+ // are not ordered (and so subject to sort instability), or there is some bug
+ // somewhere in the window functions.
+
@Test
public void test4657() throws Exception {
testBuilder()
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index f643d5f..7e63600 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -35,14 +35,17 @@ import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.test.SecondaryTest;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
+@Category(SecondaryTest.class)
public class TestSimpleExternalSort extends DrillTest {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class);
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000);
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80_000);
@Test
public void mergeSortWithSv2Managed() throws Exception {
@@ -100,7 +103,7 @@ public class TestSimpleExternalSort extends DrillTest {
ClientFixture client = cluster.clientFixture()) {
chooseImpl(client, testLegacy);
List<QueryDataBatch> results = client.queryBuilder().physicalResource("xsort/one_key_sort_descending.json").results();
- assertEquals(1000000, client.countResults(results));
+ assertEquals(1_000_000, client.countResults(results));
validateResults(client.allocator(), results);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
index 5a1bf6d..bbb48af 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -64,8 +64,8 @@ public class TestSortSpillWithException extends ClusterTest {
// inject exception in sort while spilling
final String controls = Controls.newBuilder()
.addExceptionOnBit(
- org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class,
- org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+ ExternalSortBatch.class,
+ ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
IOException.class,
cluster.drillbit().getContext().getEndpoint())
.build();
@@ -87,8 +87,8 @@ public class TestSortSpillWithException extends ClusterTest {
// inject exception in sort while spilling
final String controls = Controls.newBuilder()
.addExceptionOnBit(
- org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.class,
- org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+ ExternalSortBatch.class,
+ ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
IOException.class,
cluster.drillbit().getContext().getEndpoint())
.build();
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
index 1a4d4b2..8ba34ef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java
@@ -104,7 +104,7 @@ public class SortTestUtilities {
VectorContainer dest = new VectorContainer();
@SuppressWarnings("resource")
BatchMerger merger = copier.startMerge(schema.toBatchSchema(SelectionVectorMode.NONE),
- batches, dest, rowCount);
+ batches, dest, rowCount, null);
verifyResults(merger, dest);
dest.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
index 0050747..6464b5a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestCopier.java
@@ -69,8 +69,13 @@ public class TestCopier extends DrillTest {
PriorityQueueCopierWrapper copier = SortTestUtilities.makeCopier(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
VectorContainer dest = new VectorContainer();
try {
+ // TODO: Create a vector allocator to pass as last parameter so
+ // that the test uses the same vector allocator as the production
+ // code. Only nuisance is that we don't have the required metadata
+ // readily at hand here...
+
@SuppressWarnings({ "resource", "unused" })
- BatchMerger merger = copier.startMerge(schema, batches, dest, 10);
+ BatchMerger merger = copier.startMerge(schema, batches, dest, 10, null);
fail();
} catch (AssertionError e) {
// Expected
http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index 6bff088..69e9e1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -47,12 +47,14 @@ public class TestExternalSortInternals extends DrillTest {
assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
// Default size: 256 MiB
assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
- // Default size: 8 MiB
- assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize());
+ // Default size: 1 MiB
+ assertEquals(ONE_MEG, sortConfig.spillBatchSize());
// Default size: 16 MiB
assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize());
// Default: unlimited
assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit());
+ // Default: 64K
+ assertEquals(Character.MAX_VALUE, sortConfig.getMSortBatchSize());
}
/**
@@ -69,6 +71,7 @@ public class TestExternalSortInternals extends DrillTest {
.put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
.put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
.put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
+ .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 10)
.build();
SortConfig sortConfig = new SortConfig(drillConfig);
assertEquals(2000 * 1024, sortConfig.maxMemory());
@@ -77,6 +80,7 @@ public class TestExternalSortInternals extends DrillTest {
assertEquals(500_000, sortConfig.spillBatchSize());
assertEquals(600_000, sortConfig.mergeBatchSize());
assertEquals(50, sortConfig.getBufferedBatchLimit());
+ assertEquals(10, sortConfig.getMSortBatchSize());
}
/**
@@ -90,6 +94,7 @@ public class TestExternalSortInternals extends DrillTest {
.put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE - 1)
.put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE - 1)
.put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
+ .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 0)
.build();
SortConfig sortConfig = new SortConfig(drillConfig);
assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
@@ -97,13 +102,14 @@ public class TestExternalSortInternals extends DrillTest {
assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize());
assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, sortConfig.mergeBatchSize());
assertEquals(2, sortConfig.getBufferedBatchLimit());
+ assertEquals(1, sortConfig.getMSortBatchSize());
}
@Test
public void testMemoryManagerBasics() {
DrillConfig drillConfig = DrillConfig.create();
SortConfig sortConfig = new SortConfig(drillConfig);
- long memoryLimit = 50 * ONE_MEG;
+ long memoryLimit = 70 * ONE_MEG;
SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
// Basic setup
@@ -120,35 +126,35 @@ public class TestExternalSortInternals extends DrillTest {
int rowCount = 10000;
int batchSize = rowWidth * rowCount * 2;
- memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
// Zero rows - no update
- memManager.updateEstimates(batchSize, rowWidth, 0);
+ assertFalse(memManager.updateEstimates(batchSize, rowWidth, 0));
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Larger batch size, update batch size
rowCount = 20000;
batchSize = rowWidth * rowCount * 2;
- memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
// Smaller batch size: no change
rowCount = 5000;
int lowBatchSize = rowWidth * rowCount * 2;
- memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+ assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount));
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Different batch density, update batch size
rowCount = 10000;
batchSize = rowWidth * rowCount * 5;
- memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertTrue(memManager.updateEstimates(batchSize, rowWidth, rowCount));
verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
// Smaller row size, no update
@@ -156,23 +162,23 @@ public class TestExternalSortInternals extends DrillTest {
int lowRowWidth = 200;
rowCount = 10000;
lowBatchSize = rowWidth * rowCount * 2;
- memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount);
+ assertFalse(memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount));
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Larger row size, updates calcs
rowWidth = 400;
rowCount = 10000;
lowBatchSize = rowWidth * rowCount * 2;
- memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
+ assertTrue(memManager.updateEstimates(lowBatchSize, rowWidth, rowCount));
verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, rowCount);
// EOF: very low density
- memManager.updateEstimates(lowBatchSize, rowWidth, 5);
+ assertFalse(memManager.updateEstimates(lowBatchSize, rowWidth, 5));
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
}
private void verifyCalcs(SortConfig sortConfig, long memoryLimit, SortMemoryManager memManager, int batchSize,
@@ -183,7 +189,7 @@ public class TestExternalSortInternals extends DrillTest {
// Row and batch sizes should be exact
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Spill sizes will be rounded, but within reason.
@@ -191,9 +197,9 @@ public class TestExternalSortInternals extends DrillTest {
assertTrue(count >= memManager.getSpillBatchRowCount());
assertTrue(count/2 <= memManager.getSpillBatchRowCount());
int spillSize = memManager.getSpillBatchRowCount() * rowWidth;
- assertTrue(spillSize <= memManager.getSpillBatchSize());
- assertTrue(spillSize >= memManager.getSpillBatchSize()/2);
- assertEquals(memoryLimit - memManager.getSpillBatchSize(), memManager.getBufferMemoryLimit());
+ assertTrue(spillSize <= memManager.getSpillBatchSize().dataSize);
+ assertTrue(spillSize >= memManager.getSpillBatchSize().dataSize/2);
+ assertTrue(memManager.getBufferMemoryLimit() <= memoryLimit - memManager.getSpillBatchSize().expectedBufferSize );
// Merge sizes will also be rounded, within reason.
@@ -201,9 +207,9 @@ public class TestExternalSortInternals extends DrillTest {
assertTrue(count >= memManager.getMergeBatchRowCount());
assertTrue(count/2 <= memManager.getMergeBatchRowCount());
int mergeSize = memManager.getMergeBatchRowCount() * rowWidth;
- assertTrue(mergeSize <= memManager.getMergeBatchSize());
- assertTrue(mergeSize >= memManager.getMergeBatchSize()/2);
- assertEquals(memoryLimit - memManager.getMergeBatchSize(), memManager.getMergeMemoryLimit());
+ assertTrue(mergeSize <= memManager.getMergeBatchSize().dataSize);
+ assertTrue(mergeSize >= memManager.getMergeBatchSize().dataSize/2);
+ assertTrue(memManager.getMergeMemoryLimit() <= memoryLimit - memManager.getMergeBatchSize().expectedBufferSize);
}
@Test
@@ -220,7 +226,7 @@ public class TestExternalSortInternals extends DrillTest {
int batchSize = rowCount * 2;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertEquals(10, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Truncate spill, merge batch row count
@@ -234,12 +240,12 @@ public class TestExternalSortInternals extends DrillTest {
// Small, but non-zero, row
- rowWidth = 20;
+ rowWidth = 10;
rowCount = 10000;
batchSize = rowWidth * rowCount;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
// Truncate spill, merge batch row count
@@ -256,69 +262,89 @@ public class TestExternalSortInternals extends DrillTest {
public void testLowMemory() {
DrillConfig drillConfig = DrillConfig.create();
SortConfig sortConfig = new SortConfig(drillConfig);
- long memoryLimit = 10 * ONE_MEG;
+ int memoryLimit = 10 * ONE_MEG;
SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
// Tight squeeze, but can be made to work.
- // Input batches are a quarter of memory.
+ // Input batch buffer size is a quarter of memory.
int rowWidth = 1000;
- int rowCount = (int) (memoryLimit / 4 / rowWidth);
- int batchSize = rowCount * rowWidth;
+ int batchSize = SortMemoryManager.multiply(memoryLimit / 4, SortMemoryManager.PAYLOAD_FROM_BUFFER);
+ int rowCount = batchSize / rowWidth;
+ batchSize = rowCount * rowWidth;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
assertFalse(memManager.mayOverflow());
+ assertTrue(memManager.hasPerformanceWarning());
// Spill, merge batches should be constrained
- int spillBatchSize = memManager.getSpillBatchSize();
+ int spillBatchSize = memManager.getSpillBatchSize().dataSize;
assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
assertTrue(spillBatchSize >= rowWidth);
assertTrue(spillBatchSize <= memoryLimit / 3);
assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
- int mergeBatchSize = memManager.getMergeBatchSize();
+ int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
assertTrue(mergeBatchSize >= rowWidth);
assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
- // Should spill after just two batches
+ // Should spill after just two or three batches
- assertFalse(memManager.isSpillNeeded(0, batchSize));
- assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
- assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+ int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize;
+ assertFalse(memManager.isSpillNeeded(0, inputBufferSize));
+ assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize));
+ assertTrue(memManager.isSpillNeeded(3 * inputBufferSize, inputBufferSize));
+ }
+
+ @Test
+ public void testLowerMemory() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ int memoryLimit = 10 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
// Tighter squeeze, but can be made to work.
// Input batches are 3/8 of memory; two fill 3/4,
// but small spill and merge batches allow progress.
- rowWidth = 1000;
- rowCount = (int) (memoryLimit * 3 / 8 / rowWidth);
+ int rowWidth = 1000;
+ int batchSize = SortMemoryManager.multiply(memoryLimit * 3 / 8, SortMemoryManager.PAYLOAD_FROM_BUFFER);
+ int rowCount = batchSize / rowWidth;
batchSize = rowCount * rowWidth;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
assertFalse(memManager.mayOverflow());
+ assertTrue(memManager.hasPerformanceWarning());
// Spill, merge batches should be constrained
- spillBatchSize = memManager.getSpillBatchSize();
+ int spillBatchSize = memManager.getSpillBatchSize().dataSize;
assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
assertTrue(spillBatchSize >= rowWidth);
assertTrue(spillBatchSize <= memoryLimit / 3);
assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
- assertTrue(memManager.getSpillBatchRowCount() > 1);
+ assertTrue(memManager.getSpillBatchRowCount() >= 1);
assertTrue(spillBatchSize / rowWidth >= memManager.getSpillBatchRowCount());
- mergeBatchSize = memManager.getMergeBatchSize();
+ int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
assertTrue(mergeBatchSize >= rowWidth);
assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
assertTrue(memManager.getMergeBatchRowCount() > 1);
assertTrue(mergeBatchSize / rowWidth >= memManager.getMergeBatchRowCount());
+
+ // Should spill after just two batches
+
+ int inputBufferSize = memManager.getInputBatchSize().expectedBufferSize;
+ assertFalse(memManager.isSpillNeeded(0, inputBufferSize));
+ assertFalse(memManager.isSpillNeeded(batchSize, inputBufferSize));
+ assertTrue(memManager.isSpillNeeded(2 * inputBufferSize, inputBufferSize));
}
@Test
@@ -333,21 +359,22 @@ public class TestExternalSortInternals extends DrillTest {
// Have to back off the exact size a bit to allow for internal fragmentation
// in the merge and output batches.
- int rowWidth = (int) (memoryLimit / 3 * 0.75);
+ int rowWidth = (int) (memoryLimit / 3 / 2);
int rowCount = 1;
int batchSize = rowWidth;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertEquals(rowWidth, memManager.getRowWidth());
- assertEquals(batchSize, memManager.getInputBatchSize());
+ assertEquals(batchSize, memManager.getInputBatchSize().dataSize);
assertFalse(memManager.mayOverflow());
+ assertTrue(memManager.hasPerformanceWarning());
- int spillBatchSize = memManager.getSpillBatchSize();
+ int spillBatchSize = memManager.getSpillBatchSize().dataSize;
assertTrue(spillBatchSize >= rowWidth);
assertTrue(spillBatchSize <= memoryLimit / 3);
assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
assertEquals(1, memManager.getSpillBatchRowCount());
- int mergeBatchSize = memManager.getMergeBatchSize();
+ int mergeBatchSize = memManager.getMergeBatchSize().dataSize;
assertTrue(mergeBatchSize >= rowWidth);
assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
assertEquals(1, memManager.getMergeBatchRowCount());
@@ -357,12 +384,26 @@ public class TestExternalSortInternals extends DrillTest {
assertFalse(memManager.isSpillNeeded(0, batchSize));
assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
+ }
- // In trouble now, can't fit even three rows.
+ @Test
+ public void testMemoryOverflow() {
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+ long memoryLimit = 10 * ONE_MEG;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // In trouble now, can't fit even two input batches.
+ // A better implementation would spill the first batch to a file,
+ // leave it open, and append the second batch. Slicing each big input
+ // batch into small spill batches will allow the sort to proceed as
+ // long as it can hold a single input batch and single merge batch. But,
+ // the current implementation requires all batches to be spilled are in
+ // memory at the same time...
- rowWidth = (int) (memoryLimit / 2);
- rowCount = 1;
- batchSize = rowWidth;
+ int rowWidth = (int) (memoryLimit / 2);
+ int rowCount = 1;
+ int batchSize = rowWidth;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
assertTrue(memManager.mayOverflow());
}
@@ -406,7 +447,7 @@ public class TestExternalSortInternals extends DrillTest {
memManager.updateEstimates(batchSize, rowWidth, rowCount);
- int spillBatchSize = memManager.getSpillBatchSize();
+ int spillBatchSize = memManager.getSpillBatchSize().dataSize;
// Test various memory fill levels
@@ -432,63 +473,67 @@ public class TestExternalSortInternals extends DrillTest {
.build();
SortConfig sortConfig = new SortConfig(drillConfig);
// Allow four spill batches, 8 MB each, plus one output of 16
- long memoryLimit = 50 * ONE_MEG;
+ // Allow for internal fragmentation
+ // 96 > (4 * 8 + 16) * 2
+ long memoryLimit = 96 * ONE_MEG;
SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
- // Prime the estimates
+ // Prime the estimates. Batch size is data size, not buffer size.
int rowWidth = 300;
int rowCount = 10000;
int batchSize = rowWidth * rowCount * 2;
memManager.updateEstimates(batchSize, rowWidth, rowCount);
- int spillBatchSize = memManager.getSpillBatchSize();
- int mergeBatchSize = memManager.getMergeBatchSize();
+ assertFalse(memManager.isLowMemory());
+ int spillBatchBufferSize = memManager.getSpillBatchSize().expectedBufferSize;
+ int inputBatchBufferSize = memManager.getInputBatchSize().expectedBufferSize;
// One in-mem batch, no merging.
- long allocMemory = memoryLimit - mergeBatchSize;
+ long allocMemory = inputBatchBufferSize;
MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0);
assertEquals(MergeAction.NONE, task.action);
// Many in-mem batches, just enough to merge
- allocMemory = memoryLimit - mergeBatchSize;
- int memBatches = (int) (allocMemory / batchSize);
- allocMemory = memBatches * batchSize;
+ int memBatches = (int) (memManager.getMergeMemoryLimit() / inputBatchBufferSize);
+ allocMemory = memBatches * inputBatchBufferSize;
task = memManager.consolidateBatches(allocMemory, memBatches, 0);
assertEquals(MergeAction.NONE, task.action);
// Spills if no room for spill and in-memory batches
- task = memManager.consolidateBatches(allocMemory, memBatches, 1);
+ int spillCount = (int) Math.ceil((memManager.getMergeMemoryLimit() - allocMemory) / (1.0 * spillBatchBufferSize));
+ assertTrue(spillCount >= 1);
+ task = memManager.consolidateBatches(allocMemory, memBatches, spillCount);
assertEquals(MergeAction.SPILL, task.action);
// One more in-mem batch: now needs to spill
memBatches++;
- allocMemory = memBatches * batchSize;
+ allocMemory = memBatches * inputBatchBufferSize;
task = memManager.consolidateBatches(allocMemory, memBatches, 0);
assertEquals(MergeAction.SPILL, task.action);
// No spill for various in-mem/spill run combinations
- allocMemory = memoryLimit - spillBatchSize - mergeBatchSize;
- memBatches = (int) (allocMemory / batchSize);
- allocMemory = memBatches * batchSize;
+ long freeMem = memManager.getMergeMemoryLimit() - spillBatchBufferSize;
+ memBatches = (int) (freeMem / inputBatchBufferSize);
+ allocMemory = memBatches * inputBatchBufferSize;
task = memManager.consolidateBatches(allocMemory, memBatches, 1);
assertEquals(MergeAction.NONE, task.action);
- allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize;
- memBatches = (int) (allocMemory / batchSize);
- allocMemory = memBatches * batchSize;
+ freeMem = memManager.getMergeMemoryLimit() - 2 * spillBatchBufferSize;
+ memBatches = (int) (freeMem / inputBatchBufferSize);
+ allocMemory = memBatches * inputBatchBufferSize;
task = memManager.consolidateBatches(allocMemory, memBatches, 2);
assertEquals(MergeAction.NONE, task.action);
// No spill if no in-memory, only spill, and spill fits
- long freeMem = memoryLimit - mergeBatchSize;
- int spillBatches = (int) (freeMem / spillBatchSize);
+ freeMem = memManager.getMergeMemoryLimit();
+ int spillBatches = (int) (freeMem / spillBatchBufferSize);
task = memManager.consolidateBatches(0, 0, spillBatches);
assertEquals(MergeAction.NONE, task.action);
@@ -503,6 +548,47 @@ public class TestExternalSortInternals extends DrillTest {
task = memManager.consolidateBatches(0, 0, spillBatches + 2);
assertEquals(MergeAction.MERGE, task.action);
assertEquals(3, task.count);
+
+ // If only one spilled run, and no in-memory batches,
+ // skip merge.
+
+ task = memManager.consolidateBatches(0, 0, 1);
+ assertEquals(MergeAction.NONE, task.action);
+
+ // Very large number of spilled runs. Limit to what fits in memory.
+
+ task = memManager.consolidateBatches(0, 0, 1000);
+ assertEquals(MergeAction.MERGE, task.action);
+ assertTrue(task.count <= (int)(memoryLimit / spillBatchBufferSize) - 1);
+ }
+
+ @Test
+ public void testMergeCalcsExtreme() {
+
+ DrillConfig drillConfig = DrillConfig.create();
+ SortConfig sortConfig = new SortConfig(drillConfig);
+
+ // Force odd situation in which the spill batch is larger
+ // than memory. Won't actually run, but needed to test
+ // odd merge case.
+
+ long memoryLimit = ONE_MEG / 2;
+ SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
+
+ // Prime the estimates. Batch size is data size, not buffer size.
+
+ int rowWidth = (int) memoryLimit;
+ int rowCount = 1;
+ int batchSize = rowWidth;
+
+ memManager.updateEstimates(batchSize, rowWidth, rowCount);
+ assertTrue(memManager.getMergeMemoryLimit() < rowWidth);
+
+ // Only one spill batch, that batch is above the merge memory limit,
+ // but nothing useful comes from merging.
+
+ MergeTask task = memManager.consolidateBatches(0, 0, 1);
+ assertEquals(MergeAction.NONE, task.action);
}
@Test