You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2015/09/18 02:27:46 UTC
drill git commit: DRILL-3779: Fix NPE in mergeAndSpill(). Add more
debug logging messages. Make copier per-batch memory limit power of 2. Get
rid of some warnings. Add a few comments.
Repository: drill
Updated Branches:
refs/heads/master e52d473eb -> 813903a34
DRILL-3779: Fix NPE in mergeAndSpill(). Add more debug logging messages. Make copier per-batch memory limit power of 2.
Get rid of some warnings. Add a few comments.
Address review comments.
Close apache/drill#160
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/813903a3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/813903a3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/813903a3
Branch: refs/heads/master
Commit: 813903a34ea1c9c3fec28f2472312c8785f780c5
Parents: e52d473
Author: Aman Sinha <as...@maprtech.com>
Authored: Thu Sep 17 01:29:38 2015 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Thu Sep 17 17:26:43 2015 -0700
----------------------------------------------------------------------
.../physical/impl/xsort/ExternalSortBatch.java | 63 +++++++++++++++-----
1 file changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/813903a3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 31deada..f1e22b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -100,6 +100,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private SingleBatchSorter sorter;
private SortRecordBatchBuilder builder;
private MSorter mSorter;
+ /**
+ * A single PriorityQueueCopier instance is used for 2 purposes:
+ * 1. Merge sorted batches before spilling
+ * 2. Merge sorted batches when all incoming data fits in memory
+ */
private PriorityQueueCopier copier;
private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
@@ -114,6 +119,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private final String fileName;
private int firstSpillBatchCount = 0;
+ /**
+ * The copier uses the COPIER_BATCH_MEM_LIMIT to estimate the target
+ * number of records to return in each batch.
+ */
+ private static final int COPIER_BATCH_MEM_LIMIT = 256 * 1024;
+
public static final String INTERRUPTION_AFTER_SORT = "after-sort";
public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
@@ -205,7 +216,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
switch (outcome) {
case OK:
case OK_NEW_SCHEMA:
- for (VectorWrapper w : incoming) {
+ for (VectorWrapper<?> w : incoming) {
ValueVector v = container.addOrGet(w.getField());
if (v instanceof AbstractContainerVector) {
w.getValueVector().makeTransferPair(v); // Can we remove this hack?
@@ -225,6 +236,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
case NONE:
state = BatchState.DONE;
break;
+ default:
+ break;
}
}
@@ -291,7 +304,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
first = false;
}
if (incoming.getRecordCount() == 0) {
- for (VectorWrapper w : incoming) {
+ for (VectorWrapper<?> w : incoming) {
w.clear();
}
break;
@@ -345,7 +358,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
logger.info("Merging spills");
- spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+ final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
+ if (merged != null) {
+ spilledBatchGroups.addFirst(merged);
+ }
}
final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) { // make sure we don't add null to spilledBatchGroups
@@ -366,8 +382,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
logger.debug("received OUT_OF_MEMORY, trying to spill");
highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) {
- spilledBatchGroups.add(mergeAndSpill(batchGroups));
- batchesSinceLastSpill = 0;
+ final BatchGroup merged = mergeAndSpill(batchGroups);
+ if (merged != null) {
+ spilledBatchGroups.add(merged);
+ batchesSinceLastSpill = 0;
+ }
} else {
logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
return IterOutcome.OUT_OF_MEMORY;
@@ -418,7 +437,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
long t = watch.elapsed(TimeUnit.MICROSECONDS);
// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
container.buildSchema(SelectionVectorMode.FOUR_BYTE);
- } else {
+ } else { // some batches were spilled
final BatchGroup merged = mergeAndSpill(batchGroups);
if (merged != null) {
spilledBatchGroups.add(merged);
@@ -430,14 +449,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
createCopier(hyperBatch, batchGroups, container, false);
int estimatedRecordSize = 0;
- for (VectorWrapper w : batchGroups.get(0)) {
+ for (VectorWrapper<?> w : batchGroups.get(0)) {
try {
estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
} catch (UnsupportedOperationException e) {
estimatedRecordSize += 50;
}
}
- targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, 250 * 1000 / estimatedRecordSize));
+ targetRecordCount = Math.min(MAX_BATCH_SIZE, Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize));
int count = copier.next(targetRecordCount);
container.buildSchema(SelectionVectorMode.NONE);
container.setRecordCount(count);
@@ -470,6 +489,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
+ logger.debug("mergeAndSpill: starting totalSizeInMemory = {}", totalSizeInMemory);
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
int batchCount = batchGroups.size();
@@ -478,28 +498,36 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
break;
}
BatchGroup batch = batchGroups.pollLast();
+ assert batch != null : "Encountered a null batch during merge and spill operation";
batchGroupList.add(batch);
long bufferSize = getBufferSize(batch);
+ logger.debug("mergeAndSpill: buffer size for batch {} = {}", i, bufferSize);
totalSizeInMemory -= bufferSize;
}
+ logger.debug("mergeAndSpill: intermediate estimated total size in memory = {}", totalSizeInMemory);
+
if (batchGroupList.size() == 0) {
return null;
}
int estimatedRecordSize = 0;
- for (VectorWrapper w : batchGroupList.get(0)) {
+ for (VectorWrapper<?> w : batchGroupList.get(0)) {
try {
estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
} catch (UnsupportedOperationException e) {
estimatedRecordSize += 50;
}
}
- int targetRecordCount = Math.max(1, 250 * 1000 / estimatedRecordSize);
+ int targetRecordCount = Math.max(1, COPIER_BATCH_MEM_LIMIT / estimatedRecordSize);
VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
createCopier(hyperBatch, batchGroupList, outputContainer, true);
int count = copier.next(targetRecordCount);
assert count > 0;
+ logger.debug("mergeAndSpill: estimated record size = {}, target record count = {}", estimatedRecordSize, targetRecordCount);
+
+ // 1 output container is kept in memory, so we want to hold on to it and transferClone
+ // allows keeping ownership
VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);
@@ -512,6 +540,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
while ((count = copier.next(targetRecordCount)) > 0) {
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
outputContainer.setRecordCount(count);
+ // note that addBatch also clears the outputContainer
newGroup.addBatch(outputContainer);
}
newGroup.closeOutputStream();
@@ -522,14 +551,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} catch (IOException e) {
throw new RuntimeException(e);
}
- takeOwnership(c1);
- totalSizeInMemory += getBufferSize(c1);
+ takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
+ long bufSize = getBufferSize(c1);
+ totalSizeInMemory += bufSize;
+ logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
logger.info("Completed spilling to {}", outputFile);
return newGroup;
}
private void takeOwnership(VectorAccessible batch) {
- for (VectorWrapper w : batch) {
+ for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
for (DrillBuf buf : bufs) {
if (buf.isRootBuffer()) {
@@ -541,7 +572,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private long getBufferSize(VectorAccessible batch) {
long size = 0;
- for (VectorWrapper w : batch) {
+ for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
for (DrillBuf buf : bufs) {
if (buf.isRootBuffer()) {
@@ -558,7 +589,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
try {
spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
} catch (SchemaChangeException e) {
- throw new RuntimeException();
+ throw new RuntimeException(e);
}
batchesSinceLastSpill = 0;
int waitTime = 1;
@@ -659,7 +690,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return context.getImplementationClass(cg);
}
- private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
+ private void generateComparisons(ClassGenerator<?> g, VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
for (Ordering od : popConfig.getOrderings()) {