You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/25 21:08:01 UTC

[12/14] git commit: Fix for sort ordering: canonicalize incoming batches in top-n

Fix for sort ordering: canonicalize incoming batches in top-n


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0b296c82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0b296c82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0b296c82

Branch: refs/heads/master
Commit: 0b296c82e5e7b07de1266221f42761a590a476fa
Parents: 63d79a1
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Jun 23 12:11:19 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed Jun 25 09:10:13 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/physical/impl/TopN/TopNBatch.java  | 7 +++++--
 .../drill/exec/physical/impl/sort/RecordBatchData.java       | 6 ++++++
 .../exec/physical/impl/sort/SortRecordBatchBuilder.java      | 8 +++++++-
 exec/java-exec/src/main/resources/drill-module.conf          | 2 +-
 4 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 846d419..77be4ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -170,10 +170,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         case OK:
           countSincePurge += incoming.getRecordCount();
           batchCount++;
+          RecordBatchData batch = new RecordBatchData(incoming);
+          batch.canonicalize();
           if (priorityQueue == null) {
-            priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(incoming), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+            priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
           }
-          priorityQueue.add(context, new RecordBatchData(incoming));
+          priorityQueue.add(context, batch);
           if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
             purge();
             countSincePurge = 0;
@@ -242,6 +244,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     selectionVector4.clear();
     c.clear();
     VectorContainer newQueue = new VectorContainer();
+    builder.canonicalize();
     builder.build(context, newQueue);
     priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
     builder.getSv4().clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 9cb6f79..02cad5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -60,6 +60,12 @@ public class RecordBatchData {
     container.setRecordCount(recordCount);
     container.buildSchema(batch.getSchema().getSelectionVectorMode());
   }
+
+  public void canonicalize() {
+    SelectionVectorMode mode = container.getSchema().getSelectionVectorMode();
+    container = VectorContainer.canonicalize(container);
+    container.buildSchema(mode);
+  }
   
   public int getRecordCount(){
     return recordCount;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index ba200f6..9626a97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -112,6 +112,12 @@ public class SortRecordBatchBuilder {
     return true;
   }
 
+  public void canonicalize() {
+    for (RecordBatchData batch : batches.values()) {
+      batch.canonicalize();
+    }
+  }
+
   public boolean isEmpty(){
     return batches.isEmpty();
   }
@@ -165,7 +171,7 @@ public class SortRecordBatchBuilder {
       }
     }
 
-    for(MaterializedField f : vectors.keySet()){
+    for(MaterializedField f : schema){
       List<ValueVector> v = vectors.get(f);
       outputContainer.addHyperList(v, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0b296c82/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7f399b2..1519327 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -38,7 +38,7 @@ drill.exec: {
           count: 7200,
           delay: 500
         },
-        threads: 1
+        threads: 4
       }
     },
   	use.ip : false