You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/15 10:30:12 UTC

[1/3] drill git commit: DRILL-2577: Use DrillPathFilter in FooterGatherer

Repository: drill
Updated Branches:
  refs/heads/master 7cee11c2b -> 859e6a86b


DRILL-2577: Use DrillPathFilter in FooterGatherer


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/96943de3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/96943de3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/96943de3

Branch: refs/heads/master
Commit: 96943de36128c9ceb2a8e8c5212cb2b3e4101e26
Parents: 7cee11c
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Mar 25 18:35:20 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 16:47:15 2015 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/FooterGatherer.java      |  3 +-
 .../exec/store/parquet/TestParquetScan.java     | 66 ++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/96943de3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 6498fd2..0bb86e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -82,7 +83,7 @@ public class FooterGatherer {
         }
 
         // else we handle as normal file.
-        for(FileStatus inStatus : fs.listStatus(status.getPath())){
+        for(FileStatus inStatus : fs.listStatus(status.getPath(), new DrillPathFilter())){
           readers.add(new FooterReader(conf, inStatus));
         }
       }else{

http://git-wip-us.apache.org/repos/asf/drill/blob/96943de3/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
new file mode 100644
index 0000000..69759e6
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetScan.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.google.common.io.Resources;
+import org.apache.drill.BaseTestQuery;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestParquetScan extends BaseTestQuery {
+
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void initFs() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.default.name", "local");
+
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testSuccessFile() throws Exception {
+    Path p = new Path("/tmp/nation_test_parquet_scan");
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+
+    fs.mkdirs(p);
+
+    byte[] bytes = Resources.toByteArray(Resources.getResource("tpch/nation.parquet"));
+
+    FSDataOutputStream os = fs.create(new Path(p, "nation.parquet"));
+    os.write(bytes);
+    os.close();
+    fs.create(new Path(p, "_SUCCESS")).close();
+    fs.create(new Path(p, "_logs")).close();
+
+    testBuilder()
+        .sqlQuery("select count(*) c from dfs.tmp.nation_test_parquet_scan where 1 = 1")
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(25L)
+        .build()
+        .run();
+  }
+}


[3/3] drill git commit: DRILL-2728: Merge spill files when number gets too large

Posted by sm...@apache.org.
DRILL-2728: Merge spill files when number gets too large


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/859e6a86
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/859e6a86
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/859e6a86

Branch: refs/heads/master
Commit: 859e6a86b1c11bc30c5d1d91d176503b4f5cefbe
Parents: 959419d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Mar 17 00:29:51 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:22:02 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/BatchGroup.java    |  1 +
 .../physical/impl/xsort/ExternalSortBatch.java  | 33 +++++++++++---------
 2 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 9359ea1..6896faa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,6 +143,7 @@ public class BatchGroup implements VectorAccessible {
   }
 
   public void cleanup() throws IOException {
+    currentContainer.zeroVectors();
     if (sv2 != null) {
       sv2.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/859e6a86/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index dbfd1a5..bd3c4e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -107,6 +107,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
   private final String fileName;
+  private int firstSpillBatchCount = 0;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, true);
@@ -276,9 +277,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
           int count = sv2.getCount();
           totalCount += count;
-//          if (count == 0) {
-//            break outer;
-//          }
           sorter.setup(context, sv2, incoming);
           Stopwatch w = new Stopwatch();
           w.start();
@@ -302,7 +300,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
               // since the last spill exceed the defined limit
               (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
 
-            mergeAndSpill();
+            if (firstSpillBatchCount == 0) {
+              firstSpillBatchCount = batchGroups.size();
+            }
+
+            if (spilledBatchGroups.size() > firstSpillBatchCount / 2) {
+              logger.info("Merging spills");
+              spilledBatchGroups.addFirst(mergeAndSpill(spilledBatchGroups));
+            }
+            spilledBatchGroups.add(mergeAndSpill(batchGroups));
             batchesSinceLastSpill = 0;
           }
           long t = w.elapsed(TimeUnit.MICROSECONDS);
@@ -311,7 +317,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         case OUT_OF_MEMORY:
           highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) {
-            mergeAndSpill();
+            spilledBatchGroups.add(mergeAndSpill(batchGroups));
           }
           batchesSinceLastSpill = 0;
           break;
@@ -347,7 +353,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 //        logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
         container.buildSchema(SelectionVectorMode.FOUR_BYTE);
       } else {
-        mergeAndSpill();
+        spilledBatchGroups.add(mergeAndSpill(batchGroups));
         batchGroups.addAll(spilledBatchGroups);
         logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
         VectorContainer hyperBatch = constructHyperBatch(batchGroups);
@@ -388,7 +394,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return currentlyAvailable > neededForInMemorySort;
   }
 
-  public void mergeAndSpill() throws SchemaChangeException {
+  public BatchGroup mergeAndSpill(LinkedList<BatchGroup> batchGroups) throws SchemaChangeException {
     logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
     VectorContainer outputContainer = new VectorContainer();
     List<BatchGroup> batchGroupList = Lists.newArrayList();
@@ -397,19 +403,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       if (batchGroups.size() == 0) {
         break;
       }
-      if (batchGroups.peekLast().getSv2() == null) {
-        break;
-      }
       BatchGroup batch = batchGroups.pollLast();
       batchGroupList.add(batch);
       long bufferSize = getBufferSize(batch);
       totalSizeInMemory -= bufferSize;
     }
     if (batchGroupList.size() == 0) {
-      return;
+      return null;
     }
     int estimatedRecordSize = 0;
-    for (VectorWrapper w : batchGroups.get(0)) {
+    for (VectorWrapper w : batchGroupList.get(0)) {
       try {
         estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
       } catch (UnsupportedOperationException e) {
@@ -430,6 +433,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
     BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
 
+    logger.info("Merging and spilling to {}", outputFile);
     try {
       while ((count = copier.next(targetRecordCount)) > 0) {
         outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -437,7 +441,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         newGroup.addBatch(outputContainer);
       }
       newGroup.closeOutputStream();
-      spilledBatchGroups.add(newGroup);
       for (BatchGroup group : batchGroupList) {
         group.cleanup();
       }
@@ -447,6 +450,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     takeOwnership(c1);
     totalSizeInMemory += getBufferSize(c1);
+    logger.info("Completed spilling to {}", outputFile);
+    return newGroup;
   }
 
   private void takeOwnership(VectorAccessible batch) {
@@ -477,7 +482,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
     if (!sv2.allocateNew(incoming.getRecordCount())) {
       try {
-        mergeAndSpill();
+        spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
       } catch (SchemaChangeException e) {
         throw new RuntimeException();
       }


[2/3] drill git commit: DRILL-2730: Use different paths for ExternalSort spills

Posted by sm...@apache.org.
DRILL-2730: Use different paths for ExternalSort spills


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/959419de
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/959419de
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/959419de

Branch: refs/heads/master
Commit: 959419dec8de5bc494557167e22f9d264f30ae7a
Parents: 96943de
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sat Mar 14 20:31:21 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue Apr 14 18:17:11 2015 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  | 29 +++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/959419de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index a23780e..dbfd1a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
+import com.google.common.base.Joiner;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
@@ -47,6 +48,8 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -58,7 +61,6 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -100,11 +102,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private FileSystem fs;
   private int spillCount = 0;
   private int batchesSinceLastSpill = 0;
-  private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
   private boolean first = true;
   private long totalSizeInMemory = 0;
   private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
+  private final String fileName;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, true);
@@ -121,9 +123,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
     SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
     dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
-    uid = System.nanoTime();
     copierAllocator = oContext.getAllocator().getChildAllocator(
         context, PriorityQueueCopier.initialAllocation, PriorityQueueCopier.maxAllocation, true);
+    FragmentHandle handle = context.getHandle();
+    fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+        handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
   }
 
   @Override
@@ -423,7 +427,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
     c1.setRecordCount(count);
 
-    String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++));
+    String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
     BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
 
     try {
@@ -648,4 +652,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     incoming.kill(sendUpstream);
   }
 
+  private String getFileName(int spill) {
+     /*
+     * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+     * which we will dump the incoming buffer data
+     */
+    FragmentHandle handle = context.getHandle();
+
+    String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+    int majorFragmentId = handle.getMajorFragmentId();
+    int minorFragmentId = handle.getMinorFragmentId();
+
+    String fileName = String.format("%s//%s//major_fragment_%s//minor_fragment_%s//operator_%s//%s", dirs.next(), qid, majorFragmentId, minorFragmentId, popConfig.getOperatorId(), spill);
+
+    return fileName;
+  }
+
 }