You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2016/01/06 00:37:50 UTC
drill git commit: DRILL-4236: ExternalSort should use the new
allocator functionality to better manage it's memory usage
Repository: drill
Updated Branches:
refs/heads/master e4372f224 -> 884c5b095
DRILL-4236: ExternalSort should use the new allocator functionality to better manage it's memory usage
this closes #317
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/884c5b09
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/884c5b09
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/884c5b09
Branch: refs/heads/master
Commit: 884c5b09554d7d74e0709b61c37ade319665a850
Parents: e4372f2
Author: adeneche <ad...@gmail.com>
Authored: Wed Dec 30 14:41:20 2015 -0800
Committer: adeneche <ad...@gmail.com>
Committed: Tue Jan 5 15:37:16 2016 -0800
----------------------------------------------------------------------
.../drill/common/exceptions/UserException.java | 13 +++-
.../physical/impl/xsort/ExternalSortBatch.java | 77 +++++++-------------
2 files changed, 36 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/884c5b09/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 4473ee5..35e71d1 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -50,7 +50,16 @@ public class UserException extends DrillRuntimeException {
*/
public static Builder memoryError(final Throwable cause) {
return UserException.resourceError(cause)
- .message(MEMORY_ERROR_MSG);
+ .message(MEMORY_ERROR_MSG).addContext(cause.getMessage());
+ }
+
+ public static Builder memoryError(final String format, final Object... args) {
+ Builder builder = UserException.resourceError();
+ builder.message(MEMORY_ERROR_MSG);
+ if (!format.isEmpty()) {
+ builder.addContext(String.format(format, args));
+ }
+ return builder;
}
/**
@@ -59,7 +68,7 @@ public class UserException extends DrillRuntimeException {
* @return resource error builder
*/
public static Builder memoryError() {
- return memoryError(null);
+ return memoryError("");
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/884c5b09/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 6e79f01..8ea16b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
-import io.netty.buffer.DrillBuf;
-
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
@@ -95,7 +93,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private final int SPILL_THRESHOLD;
private final Iterator<String> dirs;
private final RecordBatch incoming;
- private BufferAllocator copierAllocator;
+ private final BufferAllocator oAllocator;
+ private final BufferAllocator copierAllocator;
private BatchSchema schema;
private SingleBatchSorter sorter;
@@ -114,12 +113,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
private boolean first = true;
- private long totalSizeInMemory = 0;
- private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
private final String fileName;
private int firstSpillBatchCount = 0;
- private long peakSizeInMemory = -1;
private int peakNumBatches = -1;
/**
@@ -157,7 +153,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
- copierAllocator = oContext.getAllocator().newChildAllocator(oContext.getAllocator().getName() + ":copier",
+ oAllocator = oContext.getAllocator();
+ copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
FragmentHandle handle = context.getHandle();
fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
@@ -356,17 +353,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
throw new OutOfMemoryException(e);
}
}
- totalSizeInMemory += getBufferSize(convertedBatch);
- if (peakSizeInMemory < totalSizeInMemory) {
- peakSizeInMemory = totalSizeInMemory;
- stats.setLongStat(Metric.PEAK_SIZE_IN_MEMORY, peakSizeInMemory);
- }
int count = sv2.getCount();
totalCount += count;
sorter.setup(context, sv2, convertedBatch);
sorter.sort(sv2);
- RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator());
+ RecordBatchData rbd = new RecordBatchData(convertedBatch, oAllocator);
boolean success = false;
try {
rbd.setSv2(sv2);
@@ -377,16 +369,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
batchesSinceLastSpill++;
- if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used.
- (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
- // If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
+ if (// If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch?
(spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) ||
// If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address
(spillCount == 0 && totalBatches > Character.MAX_VALUE) ||
// current memory used is more than 95% of memory usage limit of this operator
- (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
- // current memory used is more than 95% of memory usage limit of this fragment
- (totalSizeInMemory > .95 * oContext.getAllocator().getLimit()) ||
+ (oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) ||
// Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated
// since the last spill exceed the defined limit
(batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
@@ -417,7 +405,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
break;
case OUT_OF_MEMORY:
logger.debug("received OUT_OF_MEMORY, trying to spill");
- highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) {
final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) {
@@ -443,10 +430,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
builder.clear();
builder.close();
}
- builder = new SortRecordBatchBuilder(oContext.getAllocator());
+ builder = new SortRecordBatchBuilder(oAllocator);
for (BatchGroup group : batchGroups) {
- RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator());
+ RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator);
rbd.setSv2(group.getSv2());
builder.add(rbd);
}
@@ -454,7 +441,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
builder.build(context, container);
sv4 = builder.getSv4();
mSorter = createNewMSorter();
- mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
+ mSorter.setup(context, oAllocator, getSelectionVector4(), this.container);
// For testing memory-leak purpose, inject exception after mSorter finishes setup
injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP);
@@ -478,7 +465,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
batchGroups.addAll(spilledBatchGroups);
spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, all it's batches are in batchGroups now
- logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
+ logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oAllocator.getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
createCopier(hyperBatch, batchGroups, container, false);
@@ -513,7 +500,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
private boolean hasMemoryForInMemorySort(int currentRecordCount) {
- long currentlyAvailable = popConfig.getMaxAllocation() - oContext.getAllocator().getAllocatedMemory();
+ long currentlyAvailable = popConfig.getMaxAllocation() - oAllocator.getAllocatedMemory();
long neededForInMemorySort = SortRecordBatchBuilder.memoryNeeded(currentRecordCount) +
MSortTemplate.memoryNeeded(currentRecordCount);
@@ -523,7 +510,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
- logger.debug("mergeAndSpill: starting totalSizeInMemory = {}", totalSizeInMemory);
+ logger.debug("mergeAndSpill: starting total size in memory = {}", oAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
int batchCount = batchGroups.size();
@@ -534,11 +521,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
BatchGroup batch = batchGroups.pollLast();
assert batch != null : "Encountered a null batch during merge and spill operation";
batchGroupList.add(batch);
- long bufferSize = getBufferSize(batch);
- logger.debug("mergeAndSpill: buffer size for batch {} = {}", i, bufferSize);
- totalSizeInMemory -= bufferSize;
}
- logger.debug("mergeAndSpill: intermediate estimated total size in memory = {}", totalSizeInMemory);
if (batchGroupList.size() == 0) {
return null;
@@ -588,36 +571,26 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} finally {
hyperBatch.clear();
}
- long bufSize = getBufferSize(c1);
- totalSizeInMemory += bufSize;
- logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
+ logger.debug("mergeAndSpill: final total size in memory = {}", oAllocator.getAllocatedMemory());
logger.info("Completed spilling to {}", outputFile);
return newGroup;
}
- private long getBufferSize(VectorAccessible batch) {
- long size = 0;
- for (VectorWrapper<?> w : batch) {
- DrillBuf[] bufs = w.getValueVector().getBuffers(false);
- for (DrillBuf buf : bufs) {
- size += buf.getPossibleMemoryConsumed();
- }
- }
- return size;
- }
-
private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
- SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
+ SelectionVector2 sv2 = new SelectionVector2(oAllocator);
if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
try {
- // Not being able to allocate sv2 means this operator's allocator reached it's maximum capacity.
- // Spilling this.batchGroups won't help here as they are owned by upstream operator,
- // but spilling spilledBatchGroups may free enough memory
- final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
+ final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) {
- spilledBatchGroups.addFirst(merged);
+ spilledBatchGroups.add(merged);
} else {
- throw new OutOfMemoryException("Unable to allocate sv2, and not enough batchGroups to spill");
+ throw UserException.memoryError("Unable to allocate sv2 for %d records, and not enough batchGroups to spill.",
+ incoming.getRecordCount())
+ .addContext("batchGroups.size", batchGroups.size())
+ .addContext("spilledBatchGroups.size", spilledBatchGroups.size())
+ .addContext("allocated memory", oAllocator.getAllocatedMemory())
+ .addContext("allocator limit", oAllocator.getLimit())
+ .build(logger);
}
} catch (SchemaChangeException e) {
throw new RuntimeException(e);
@@ -771,7 +744,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
copier.close();
}
- BufferAllocator allocator = spilling ? copierAllocator : oContext.getAllocator();
+ BufferAllocator allocator = spilling ? copierAllocator : oAllocator;
for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
outputContainer.add(v);