You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 10:30:14 UTC

[3/3] drill git commit: DRILL-2728: Merge spill files when number gets too large

DRILL-2728: Merge spill files when number gets too large


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/859e6a86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/859e6a86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/859e6a86

Branch: refs/heads/master
Commit: 859e6a86b1c11bc30c5d1d91d176503b4f5cefbe
Parents: 959419d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Mar 17 00:29:51 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:22:02 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/BatchGroup.java    |  1 +
 .../physical/impl/xsort/ExternalSortBatch.java  | 33 +++++++++++---------
 2 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 9359ea1..6896faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,6 +143,7 @@ public class BatchGroup implements VectorAccessible {
   }
 
   public void cleanup() throws IOException {
+    currentContainer.zeroVectors();
     if (sv2 != null) {
       sv2.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index dbfd1a5..bd3c4e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
   private final String fileName;
+  private int firstSpillBatchCount = 0;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, true);
@@ -276,9 +277,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
           int count = sv2.getCount();
           totalCount += count;
-//          if (count == 0) {
-//            break outer;
-//          }
           sorter.setup(context, sv2, incoming);
           Stopwatch w = new Stopwatch();
           w.start();
@@ -302,7 +300,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
               // since the last spill exceed the defined limit
               (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
 
-            mergeAndSpill();
+            if (firstSpillBatchCount == 0) {
+              firstSpillBatchCount = batchGroups.size();
+            }
+
+            if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+              logger.info("Merging spills");
+              spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+            }
+            spilledBatchGroups.add(mergeAndSpill(batchGroups));
             batchesSinceLastSpill = 0;
           }
           long t = w.elapsed(TimeUnit.MICROSECONDS);
@@ -311,7 +317,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         case OUT_OF_MEMORY:
           highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) {
-            mergeAndSpill();
+            spilledBatchGroups.add(mergeAndSpill(batchGroups));
           }
           batchesSinceLastSpill = 0;
           break;
@@ -347,7 +353,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 //        logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
         container.buildSchema(SelectionVectorMode.FOUR_BYTE);
       } else {
-        mergeAndSpill();
+        spilledBatchGroups.add(mergeAndSpill(batchGroups));
         batchGroups.addAll(spilledBatchGroups);
         logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
         VectorContainer hyperBatch = constructHyperBatch(batchGroups);
@@ -388,7 +394,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return currentlyAvailable > neededForInMemorySort;
   }
 
-  public void mergeAndSpill() throws SchemaChangeException {
+  public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
@@ -397,19 +403,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       if (batchGroups.size() == 0) {
         break;
       }
-      if (batchGroups.peekLast().getSv2() == null) {
-        break;
-      }
       BatchGroup batch = batchGroups.pollLast();
       batchGroupList.add(batch);
       long bufferSize = getBufferSize(batch);
       totalSizeInMemory -= bufferSize;
     }
     if (batchGroupList.size() == 0) {
-      return;
+      return null;
     }
     int estimatedRecordSize = 0;
-    for (VectorWrapper w : batchGroups.get(0)) {
+    for (VectorWrapper w : batchGroupList.get(0)) {
       try {
         estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
       } catch (UnsupportedOperationException e) {
@@ -430,6 +433,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
     BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
 
+    logger.info("Merging and spilling to {}", outputFile);
     try {
       while ((count = copier.next(targetRecordCount)) > 0) {
         outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -437,7 +441,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         newGroup.addBatch(outputContainer);
       }
       newGroup.closeOutputStream();
-      spilledBatchGroups.add(newGroup);
       for (BatchGroup group : batchGroupList) {
         group.cleanup();
       }
@@ -447,6 +450,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     takeOwnership(c1);
     totalSizeInMemory += getBufferSize(c1);
+    logger.info("Completed spilling to {}", outputFile);
+    return newGroup;
   }
 
   private void takeOwnership(VectorAccessible batch) {
@@ -477,7 +482,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
     if (!sv2.allocateNew(incoming.getRecordCount())) {
       try {
-        mergeAndSpill();
+        spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
       } catch (SchemaChangeException e) {
         throw new RuntimeException();
       }