You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/05/03 19:50:45 UTC
[2/5] drill git commit: DRILL-2100: Added deleting temporary spill
directories when query is finished.
DRILL-2100: Added deleting temporary spill directories when query is finished.
This closes #454
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/38e1016c
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/38e1016c
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/38e1016c
Branch: refs/heads/master
Commit: 38e1016c49786acaacb153ee37784b3ce3023eb5
Parents: 1a89a7f
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Mon Mar 28 18:05:22 2016 +0000
Committer: Parth Chandra <pa...@apache.org>
Committed: Tue May 3 10:50:09 2016 -0700
----------------------------------------------------------------------
.../physical/impl/xsort/ExternalSortBatch.java | 31 ++++++++++++++++++--
1 file changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/38e1016c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 0ee518e..32df705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -22,8 +22,10 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
@@ -72,6 +74,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
@@ -116,6 +119,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private boolean first = true;
private int targetRecordCount;
private final String fileName;
+ private Set<Path> currSpillDirs = Sets.newTreeSet();
private int firstSpillBatchCount = 0;
private int peakNumBatches = -1;
@@ -158,7 +162,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
FragmentHandle handle = context.getHandle();
- fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+ fileName = String.format("%s_majorfragment%s_minorfragment%s_operator%s", QueryIdHelper.getQueryId(handle.getQueryId()),
handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
}
@@ -223,7 +227,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (mSorter != null) {
mSorter.clear();
}
-
+ for(Iterator iter = this.currSpillDirs.iterator(); iter.hasNext(); iter.remove()) {
+ Path path = (Path)iter.next();
+ try {
+ if (fs != null && path != null && fs.exists(path)) {
+ if (fs.delete(path, true)) {
+ fs.cancelDeleteOnExit(path);
+ }
+ }
+ } catch (IOException e) {
+ // since this is meant to be used in a batches's cleanup, we don't propagate the exception
+ logger.warn("Unable to delete spill directory " + path, e);
+ }
+ }
}
}
@@ -554,7 +570,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);
- String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
+ String spillDir = dirs.next();
+ Path currSpillPath = new Path(Joiner.on("/").join(spillDir, fileName));
+ currSpillDirs.add(currSpillPath);
+ String outputFile = Joiner.on("/").join(currSpillPath, spillCount++);
+ try {
+ fs.deleteOnExit(currSpillPath);
+ } catch (IOException e) {
+ // since this is meant to be used in a batches's spilling, we don't propagate the exception
+ logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", e);
+ }
stats.setLongStat(Metric.SPILL_COUNT, spillCount);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext);
try (AutoCloseable a = AutoCloseables.all(batchGroupList)) {