You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 10:30:14 UTC
[3/3] drill git commit: DRILL-2728: Merge spill files when number
gets too large
DRILL-2728: Merge spill files when number gets too large
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/859e6a86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/859e6a86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/859e6a86
Branch: refs/heads/master
Commit: 859e6a86b1c11bc30c5d1d91d176503b4f5cefbe
Parents: 959419d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Mar 17 00:29:51 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:22:02 2015 -0700
----------------------------------------------------------------------
.../exec/physical/impl/xsort/BatchGroup.java | 1 +
.../physical/impl/xsort/ExternalSortBatch.java | 33 +++++++++++---------
2 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 9359ea1..6896faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,6 +143,7 @@ public class BatchGroup implements VectorAccessible {
}
public void cleanup() throws IOException {
+ currentContainer.zeroVectors();
if (sv2 != null) {
sv2.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index dbfd1a5..bd3c4e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
private final String fileName;
+ private int firstSpillBatchCount = 0;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
@@ -276,9 +277,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
int count = sv2.getCount();
totalCount += count;
-// if (count == 0) {
-// break outer;
-// }
sorter.setup(context, sv2, incoming);
Stopwatch w = new Stopwatch();
w.start();
@@ -302,7 +300,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// since the last spill exceed the defined limit
(batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
- mergeAndSpill();
+ if (firstSpillBatchCount == 0) {
+ firstSpillBatchCount = batchGroups.size();
+ }
+
+ if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+ logger.info("Merging spills");
+ spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+ }
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchesSinceLastSpill = 0;
}
long t = w.elapsed(TimeUnit.MICROSECONDS);
@@ -311,7 +317,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
case OUT_OF_MEMORY:
highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) {
- mergeAndSpill();
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
}
batchesSinceLastSpill = 0;
break;
@@ -347,7 +353,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
container.buildSchema(SelectionVectorMode.FOUR_BYTE);
} else {
- mergeAndSpill();
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchGroups.addAll(spilledBatchGroups);
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
@@ -388,7 +394,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return currentlyAvailable > neededForInMemorySort;
}
- public void mergeAndSpill() throws SchemaChangeException {
+ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
@@ -397,19 +403,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (batchGroups.size() == 0) {
break;
}
- if (batchGroups.peekLast().getSv2() == null) {
- break;
- }
BatchGroup batch = batchGroups.pollLast();
batchGroupList.add(batch);
long bufferSize = getBufferSize(batch);
totalSizeInMemory -= bufferSize;
}
if (batchGroupList.size() == 0) {
- return;
+ return null;
}
int estimatedRecordSize = 0;
- for (VectorWrapper w : batchGroups.get(0)) {
+ for (VectorWrapper w : batchGroupList.get(0)) {
try {
estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
} catch (UnsupportedOperationException e) {
@@ -430,6 +433,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
+ logger.info("Merging and spilling to {}", outputFile);
try {
while ((count = copier.next(targetRecordCount)) > 0) {
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -437,7 +441,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
newGroup.addBatch(outputContainer);
}
newGroup.closeOutputStream();
- spilledBatchGroups.add(newGroup);
for (BatchGroup group : batchGroupList) {
group.cleanup();
}
@@ -447,6 +450,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
takeOwnership(c1);
totalSizeInMemory += getBufferSize(c1);
+ logger.info("Completed spilling to {}", outputFile);
+ return newGroup;
}
private void takeOwnership(VectorAccessible batch) {
@@ -477,7 +482,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
if (!sv2.allocateNew(incoming.getRecordCount())) {
try {
- mergeAndSpill();
+ spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
} catch (SchemaChangeException e) {
throw new RuntimeException();
}