You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 10:30:12 UTC
[1/3] drill git commit: DRILL-2577: Use DrillPathFilter in
FooterGatherer
Repository: drill
Updated Branches:
refs/heads/master 7cee11c2b -> 859e6a86b
DRILL-2577: Use DrillPathFilter in FooterGatherer
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/96943de3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/96943de3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/96943de3
Branch: refs/heads/master
Commit: 96943de36128c9ceb2a8e8c5212cb2b3e4101e26
Parents: 7cee11c
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Mar 25 18:35:20 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 16:47:15 2015 -0700
----------------------------------------------------------------------
.../exec/store/parquet/FooterGatherer.java | 3 +-
.../exec/store/parquet/TestParquetScan.java | 66 ++++++++++++++++++++
2 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/96943de3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 6498fd2..0bb86e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -82,7 +83,7 @@ public class FooterGatherer {
}
// else we handle as normal file.
- for(FileStatus inStatus : fs.listStatus(status.getPath())){
+ for(FileStatus inStatus : fs.listStatus(status.getPath(), new DrillPathFilter())){
readers.add(new FooterReader(conf, inStatus));
}
}else{
http://git-wip-us.apache.org/repos/asf/drill/blob/96943de3/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
new file mode 100644
index 0000000..69759e6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.io.Resources;
+import org.apache.drill.BaseTestQuery;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestParquetScan extends BaseTestQuery {
+
+ static FileSystem fs;
+
+ @BeforeClass
+ public static void initFs() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "local");
+
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testSuccessFile() throws Exception {
+ Path p = new Path("/tmp/nation_test_parquet_scan");
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+
+ fs.mkdirs(p);
+
+ byte[] bytes = Resources.toByteArray(Resources.getResource("tpch/nation.parquet"));
+
+ FSDataOutputStream os = fs.create(new Path(p, "nation.parquet"));
+ os.write(bytes);
+ os.close();
+ fs.create(new Path(p, "_SUCCESS")).close();
+ fs.create(new Path(p, "_logs")).close();
+
+ testBuilder()
+ .sqlQuery("select count(*) c from dfs.tmp.nation_test_parquet_scan where 1 = 1")
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(25L)
+ .build()
+ .run();
+ }
+}
[3/3] drill git commit: DRILL-2728: Merge spill files when number
gets too large
Posted by sm...@apache.org.
DRILL-2728: Merge spill files when number gets too large
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/859e6a86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/859e6a86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/859e6a86
Branch: refs/heads/master
Commit: 859e6a86b1c11bc30c5d1d91d176503b4f5cefbe
Parents: 959419d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Mar 17 00:29:51 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:22:02 2015 -0700
----------------------------------------------------------------------
.../exec/physical/impl/xsort/BatchGroup.java | 1 +
.../physical/impl/xsort/ExternalSortBatch.java | 33 +++++++++++---------
2 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 9359ea1..6896faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,6 +143,7 @@ public class BatchGroup implements VectorAccessible {
}
public void cleanup() throws IOException {
+ currentContainer.zeroVectors();
if (sv2 != null) {
sv2.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index dbfd1a5..bd3c4e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
private final String fileName;
+ private int firstSpillBatchCount = 0;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
@@ -276,9 +277,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
int count = sv2.getCount();
totalCount += count;
-// if (count == 0) {
-// break outer;
-// }
sorter.setup(context, sv2, incoming);
Stopwatch w = new Stopwatch();
w.start();
@@ -302,7 +300,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// since the last spill exceed the defined limit
(batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
- mergeAndSpill();
+ if (firstSpillBatchCount == 0) {
+ firstSpillBatchCount = batchGroups.size();
+ }
+
+ if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+ logger.info("Merging spills");
+ spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+ }
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchesSinceLastSpill = 0;
}
long t = w.elapsed(TimeUnit.MICROSECONDS);
@@ -311,7 +317,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
case OUT_OF_MEMORY:
highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) {
- mergeAndSpill();
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
}
batchesSinceLastSpill = 0;
break;
@@ -347,7 +353,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
container.buildSchema(SelectionVectorMode.FOUR_BYTE);
} else {
- mergeAndSpill();
+ spilledBatchGroups.add(mergeAndSpill(batchGroups));
batchGroups.addAll(spilledBatchGroups);
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
@@ -388,7 +394,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return currentlyAvailable > neededForInMemorySort;
}
- public void mergeAndSpill() throws SchemaChangeException {
+ public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
@@ -397,19 +403,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (batchGroups.size() == 0) {
break;
}
- if (batchGroups.peekLast().getSv2() == null) {
- break;
- }
BatchGroup batch = batchGroups.pollLast();
batchGroupList.add(batch);
long bufferSize = getBufferSize(batch);
totalSizeInMemory -= bufferSize;
}
if (batchGroupList.size() == 0) {
- return;
+ return null;
}
int estimatedRecordSize = 0;
- for (VectorWrapper w : batchGroups.get(0)) {
+ for (VectorWrapper w : batchGroupList.get(0)) {
try {
estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
} catch (UnsupportedOperationException e) {
@@ -430,6 +433,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
+ logger.info("Merging and spilling to {}", outputFile);
try {
while ((count = copier.next(targetRecordCount)) > 0) {
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -437,7 +441,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
newGroup.addBatch(outputContainer);
}
newGroup.closeOutputStream();
- spilledBatchGroups.add(newGroup);
for (BatchGroup group : batchGroupList) {
group.cleanup();
}
@@ -447,6 +450,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
takeOwnership(c1);
totalSizeInMemory += getBufferSize(c1);
+ logger.info("Completed spilling to {}", outputFile);
+ return newGroup;
}
private void takeOwnership(VectorAccessible batch) {
@@ -477,7 +482,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
if (!sv2.allocateNew(incoming.getRecordCount())) {
try {
- mergeAndSpill();
+ spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
} catch (SchemaChangeException e) {
throw new RuntimeException();
}
[2/3] drill git commit: DRILL-2730: Use different paths for
ExternalSort spills
Posted by sm...@apache.org.
DRILL-2730: Use different paths for ExternalSort spills
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/959419de
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/959419de
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/959419de
Branch: refs/heads/master
Commit: 959419dec8de5bc494557167e22f9d264f30ae7a
Parents: 96943de
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sat Mar 14 20:31:21 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:17:11 2015 -0700
----------------------------------------------------------------------
.../physical/impl/xsort/ExternalSortBatch.java | 29 +++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/959419de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index a23780e..dbfd1a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -47,6 +48,8 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -58,7 +61,6 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -100,11 +102,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private FileSystem fs;
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
- private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
private boolean first = true;
private long totalSizeInMemory = 0;
private long highWaterMark = Long.MAX_VALUE;
private int targetRecordCount;
+ private final String fileName;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
@@ -121,9 +123,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
- uid = System.nanoTime();
copierAllocator = oContext.getAllocator().getChildAllocator(
context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
+ FragmentHandle handle = context.getHandle();
+ fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+ handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
}
@Override
@@ -423,7 +427,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);
- String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++));
+ String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
try {
@@ -648,4 +652,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
incoming.kill(sendUpstream);
}
+ private String getFileName(int spill) {
+ /*
+ * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+ * which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = context.getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String fileName = String.format("%s//%s//major_fragment_%s//minor_fragment_%s//operator_%s//%s", dirs.next(), qid, majorFragmentId, minorFragmentId, popConfig.getOperatorId(), spill);
+
+ return fileName;
+ }
+
}