You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/22 21:16:55 UTC

git commit: - Add queue feature to support encoding as UnbackedRecord rather than JSON encoded byte arrays. - Add additional tool to TestUtils to get a set of unbacked records from example logical plan - Fix for DRILL-61: Correcting the offset of the car

Updated Branches:
  refs/heads/master a38856cce -> 0f8cccb3d


- Add queue feature to support encoding as UnbackedRecord rather than JSON encoded byte arrays.
- Add additional tool to TestUtils to get a set of unbacked records from example logical plan
- Fix for DRILL-61: Correcting the offset of the carryover values
- Add test for DRILL-61 bug
- Fix for DRILL-66: Add support for nullCollation: <first | last>
- Add tests for DRILL-66 functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0f8cccb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0f8cccb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0f8cccb3

Branch: refs/heads/master
Commit: 0f8cccb3dd1f97433fcb50dd41b23be9c443d1b6
Parents: a38856c
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 22 12:16:30 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 22 12:16:30 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/common/logical/data/Order.java    |   30 ++++--
 .../exec/ref/rops/CollapsingAggregateROP.java      |   10 ++-
 .../org/apache/drill/exec/ref/rops/OrderROP.java   |   12 ++-
 .../org/apache/drill/exec/ref/rse/QueueRSE.java    |   36 ++++++--
 .../apache/drill/exec/ref/values/ScalarValues.java |    4 +-
 .../java/org/apache/drill/exec/ref/TestUtils.java  |   78 +++++++++++---
 .../exec/ref/rops/CollapsingAggregateTest.java     |   50 +++++++++
 .../apache/drill/exec/ref/rops/OrderROPTest.java   |   60 +++++++++++
 .../ref/src/test/resources/collapse/test1.json     |   71 +++++++++++++
 .../ref/src/test/resources/order/nulls-first.json  |   71 +++++++++++++
 .../ref/src/test/resources/order/nulls-last.json   |   71 +++++++++++++
 11 files changed, 450 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
index 1c8108e..36ebbd3 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/Order.java
@@ -49,17 +49,14 @@ public class Order extends SingleInputOperator {
 
     private final Direction direction;
     private final LogicalExpression expr;
-
+    private final NullCollation nulls;
+    
     @JsonCreator
-    public Ordering(@JsonProperty("order") String strOrder, @JsonProperty("expr") LogicalExpression expr) {
+    public Ordering(@JsonProperty("order") String strOrder, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("nullCollation") String nullCollation) {
       this.expr = expr;
-      this.direction = Direction.DESC.description.equals(strOrder) ? Direction.DESC : Direction.ASC; // default
-                                                                                                     // to
-                                                                                                     // ascending
-                                                                                                     // unless
-                                                                                                     // desc
-                                                                                                     // is
-                                                                                                     // provided.
+      this.nulls = NullCollation.NULLS_LAST.description.equals(nullCollation) ? NullCollation.NULLS_LAST :  NullCollation.NULLS_FIRST; // default first
+      this.direction = Direction.DESC.description.equals(strOrder) ? Direction.DESC : Direction.ASC; // default asc
+                                                                                                     
     }
 
     @JsonIgnore
@@ -75,6 +72,21 @@ public class Order extends SingleInputOperator {
       return direction.description;
     }
 
+    public NullCollation getNullCollation() {
+      return nulls;
+    }
+    
+    
+
+  }
+  public static enum NullCollation {
+    NULLS_FIRST("first"), NULLS_LAST("last");
+    
+    public final String description;
+
+    NullCollation(String d) {
+      description = d;
+    }
   }
 
   public static enum Direction {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
index 26c3d54..4861bc6 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/CollapsingAggregateROP.java
@@ -67,9 +67,9 @@ public class CollapsingAggregateROP extends SingleInputROPBase<CollapsingAggrega
     aggs = new AggregatingEvaluator[config.getAggregations().length];
     carryovers = new BasicEvaluator[config.getCarryovers().length];
     carryoverNames = new FieldReference[config.getCarryovers().length];
-    
+    carryoverValues = new DataValue[config.getCarryovers().length];
+
     if(targetMode){
-      carryoverValues = new DataValue[config.getCarryovers().length];
       targetEvaluator = builder.getBasicEvaluator(record, config.getTarget());
     }
     aggNames = new SchemaPath[aggs.length];
@@ -165,6 +165,10 @@ public class CollapsingAggregateROP extends SingleInputROPBase<CollapsingAggrega
             carryoverValues[i] = carryovers[i].eval();
           }
         }
+      }else{
+        for(int i =0 ; i < carryovers.length; i++){
+          carryoverValues[i] = carryovers[i].eval();
+        }
       }
     }
     
@@ -193,7 +197,7 @@ public class CollapsingAggregateROP extends SingleInputROPBase<CollapsingAggrega
         }
       }else{
         for(int y = 0; y < carryoverNames.length; y++){
-          outputRecord.addField(carryoverNames[y], carryovers[y].eval());
+          outputRecord.addField(carryoverNames[y], carryoverValues[y]);
         }
         return true;
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
index 969df71..cbb59eb 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/OrderROP.java
@@ -6,6 +6,7 @@ import java.util.List;
 
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.NullCollation;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.ref.RecordIterator;
 import org.apache.drill.exec.ref.RecordPointer;
@@ -44,7 +45,7 @@ public class OrderROP extends AbstractBlockingOperator<Order> {
 
     for (int i = 0; i < orderings.length; i++) {
       defs[i] = new SortDefinition(builder.getBasicEvaluator(inputRecord, orderings[i].getExpr()),
-          orderings[i].getDirection() == Direction.ASC);
+          orderings[i].getDirection() == Direction.ASC, orderings[i].getNullCollation() == NullCollation.NULLS_LAST);
     }
   }
 
@@ -87,9 +88,10 @@ public class OrderROP extends AbstractBlockingOperator<Order> {
     boolean nullsLast;
     BasicEvaluator evaluator;
 
-    public SortDefinition(BasicEvaluator evaluator, boolean forward) {
+    public SortDefinition(BasicEvaluator evaluator, boolean forward, boolean nullsLast) {
       this.evaluator = evaluator;
       this.forward = forward;
+      this.nullsLast = nullsLast;
     }
   }
 
@@ -128,13 +130,13 @@ public class OrderROP extends AbstractBlockingOperator<Order> {
         boolean asc = defs[i].forward;
         DataValue dv1 = v1.values[i];
         DataValue dv2 = v2.values[i];
-        if (dv1 == null) {
-          if (dv2 == null) {
+        if (dv1 == DataValue.NULL_VALUE) {
+          if (dv2 == DataValue.NULL_VALUE) {
             result = 0;
           } else {
             result = nullLast ? 1 : -1;
           }
-        } else if (dv2 == null) {
+        } else if (dv2 == DataValue.NULL_VALUE) {
           result = nullLast ? -1 : 1;
         } else {
           if (dv1 instanceof ComparableValue && ((ComparableValue) dv1).supportsCompare(dv2)) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
index 623e752..f8976b2 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rse/QueueRSE.java
@@ -40,9 +40,11 @@ public class QueueRSE extends RSEBase {
 
   private DrillConfig dConfig;
   private final List<Queue<Object>> sinkQueues;
+  private final QueueRSEConfig engineConfig;
   
   public QueueRSE(QueueRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
     this.dConfig = dConfig;
+    this.engineConfig = engineConfig;
     sinkQueues = Collections.singletonList( (Queue<Object>) (new ArrayBlockingQueue<Object>(100)));
   }
 
@@ -52,15 +54,28 @@ public class QueueRSE extends RSEBase {
   
   @JsonTypeName("queue")
   public static class QueueRSEConfig extends StorageEngineConfigBase {
+    
+    public static enum Encoding {JSON, RECORD};
+    
+    private final Encoding encoding;
+    
     @JsonCreator
-    public QueueRSEConfig(@JsonProperty("name") String name) {
+    public QueueRSEConfig(@JsonProperty("name") String name, @JsonProperty("encoding") Encoding encoding) {
       super(name);
+      this.encoding = encoding == null ? Encoding.JSON : encoding;
+    }
+
+    public Encoding getEncoding() {
+      return encoding;
     }
+    
+    
   }
   
   public static class QueueOutputInfo{
     public int number;
   }
+  
 
   public boolean supportsWrite() {
     return true;
@@ -89,12 +104,19 @@ public class QueueRSE extends RSEBase {
 
     @Override
     public long recordRecord(RecordPointer r) throws IOException {
-      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      final JSONDataWriter writer = new JSONDataWriter(baos);
-      r.write(writer);
-      writer.finish();
-      queue.add(baos.toByteArray());
-      return 0;
+      switch(engineConfig.encoding){
+      case RECORD:
+        queue.add(r.copy());
+        return 0;
+      default:
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final JSONDataWriter writer = new JSONDataWriter(baos);
+        r.write(writer);
+        writer.finish();
+        queue.add(baos.toByteArray());
+        return 0;
+      }
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
index 4df146c..d401927 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/values/ScalarValues.java
@@ -518,7 +518,7 @@ public final class ScalarValues {
 
     @Override
     public String toString() {
-      return "NullValue []";
+      return "NullValue";
     }
 
     @Override
@@ -534,7 +534,7 @@ public final class ScalarValues {
 
     @Override
     public DataValue copy() {
-      return new NullValue();
+      return this;
     }
     
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/TestUtils.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/TestUtils.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/TestUtils.java
index b942538..fe9c239 100644
--- a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/TestUtils.java
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/TestUtils.java
@@ -1,25 +1,28 @@
 package org.apache.drill.exec.ref;
 
-import com.google.common.base.Charsets;
-import com.google.common.collect.Queues;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
 import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
 import org.apache.drill.exec.ref.rse.JSONRecordReader;
 import org.apache.drill.exec.ref.rse.RSERegistry;
-import org.codehaus.jackson.node.ArrayNode;
 
-import java.io.*;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 
 public class TestUtils {
   public static RecordIterator jsonToRecordIterator(String schemaPath, String j) throws IOException {
@@ -46,15 +49,56 @@ public class TestUtils {
    * @throws Exception
    */
   public static void assertProduceCount(String resourcePath, int recordCount) throws Exception {
-    DrillConfig config = DrillConfig.create();
-    final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
-    config.setSinkQueues(0, queue);
+    DrillConfig config = getConfigWithQueue(0);
+    Collection<RunOutcome> outcomes = getOutcome(config, resourcePath);
+    assertEquals(outcomes.iterator().next().records, recordCount);
+  }
+
+  /**
+   * Runs a logical query plan and returns output
+   * @param config DrillConfig to be ustilized.
+   * @param resourcePath Path for JSON logical plan
+   * @return A collection of RunOutcomes
+   * @throws IOException
+   */
+  public static Collection<RunOutcome> getOutcome(DrillConfig config, String resourcePath) throws IOException{
     LogicalPlan plan = LogicalPlan.parse(config, Files.toString(FileUtils.getResourceAsFile(resourcePath), Charsets.UTF_8));
     IteratorRegistry ir = new IteratorRegistry();
     ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config));
     i.setup();
-    Collection<RunOutcome> outcomes = i.run();
-    assertEquals(outcomes.iterator().next().records, recordCount);
+    return i.run();
+  }
+  
+  private static DrillConfig getConfigWithQueue(int queueNumber){
+    DrillConfig config = DrillConfig.create();
+    final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
+    config.setSinkQueues(queueNumber, queue);
+    return config;
   }
+  
+  
+  
+  public static List<UnbackedRecord> getResultAsUnbackedRecords(String resourcePath) throws Exception{
+    DrillConfig config = getConfigWithQueue(0);
+    Collection<RunOutcome> outcomes = getOutcome(config, resourcePath);
+    if(outcomes.size() != 1) throw new Exception("Only supports logical plans that provide a single outcome.");
+    RunOutcome out = outcomes.iterator().next();
+    switch(out.outcome){
+    case CANCELED:
+    case FAILED:
+      if(out.exception != null) throw (Exception) out.exception;
+      throw new Exception("Failure while running query.");
+    }
+    Object o;
+    Queue<Object> q = config.getQueue(0);
+    List<UnbackedRecord> records = Lists.newArrayList();
+    while( (o = q.poll()) != null){
+      if(o instanceof OutcomeType) continue;
+      if( !(o instanceof UnbackedRecord) ) throw new Exception(String.format("This method only works when the QueueRSE is configured to use RECORD encoding.  One of the queue objects was of type %s", o.getClass().getCanonicalName()));
+      records.add( (UnbackedRecord) o);
+    }
 
+    return records;
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/CollapsingAggregateTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/CollapsingAggregateTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/CollapsingAggregateTest.java
new file mode 100644
index 0000000..edc92a4
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/CollapsingAggregateTest.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.TestUtils;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.junit.Test;
+
+public class CollapsingAggregateTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CollapsingAggregateTest.class);
+  
+  
+  @Test
+  public void checkNullsHandling() throws Exception{
+    List<UnbackedRecord> records = TestUtils.getResultAsUnbackedRecords("/collapse/test1.json");
+
+    DataValue[] depts = {DataValue.NULL_VALUE, new LongScalar(31), new LongScalar(33), new LongScalar(34)};
+    DataValue[] cnts = {new LongScalar(1), new LongScalar(1), new LongScalar(2), new LongScalar(2)};
+    SchemaPath typeCount = new SchemaPath("typeCount");
+    SchemaPath dept = new SchemaPath("deptId");
+    for(int i =0; i < depts.length; i++){
+      UnbackedRecord r = records.get(i);
+      assertEquals(String.format("Invalid dept value for record %d.", i), depts[i], r.getField(dept));
+      assertEquals(String.format("Invalid type count value for record %d with deptId %s.", i, depts[i]), cnts[i], r.getField(typeCount));
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/OrderROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/OrderROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/OrderROPTest.java
new file mode 100644
index 0000000..23d0fe6
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/OrderROPTest.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rops;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.TestUtils;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.junit.Test;
+
+public class OrderROPTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderROPTest.class);
+  
+  
+  @Test
+  public void checkNullsFirst() throws Exception{
+    List<UnbackedRecord> records = TestUtils.getResultAsUnbackedRecords("/order/nulls-first.json");
+
+    DataValue[] depts = {DataValue.NULL_VALUE, new LongScalar(31), new LongScalar(33), new LongScalar(34)};
+    SchemaPath dept = new SchemaPath("deptId");
+    for(int i =0; i < depts.length; i++){
+      UnbackedRecord r = records.get(i);
+      assertEquals(String.format("Invalid dept value for record %d.", i), depts[i], r.getField(dept));
+    }
+    
+  }
+  
+  @Test
+  public void checkNullsLast() throws Exception{
+    List<UnbackedRecord> records = TestUtils.getResultAsUnbackedRecords("/order/nulls-last.json");
+
+    DataValue[] depts = {new LongScalar(31), new LongScalar(33), new LongScalar(34), DataValue.NULL_VALUE};
+    SchemaPath dept = new SchemaPath("deptId");
+    for(int i =0; i < depts.length; i++){
+      UnbackedRecord r = records.get(i);
+      assertEquals(String.format("Invalid dept value for record %d.", i), depts[i], r.getField(dept));
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/resources/collapse/test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/collapse/test1.json b/sandbox/prototype/exec/ref/src/test/resources/collapse/test1.json
new file mode 100644
index 0000000..43ef233
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/collapse/test1.json
@@ -0,0 +1,71 @@
+{
+  "head" : {
+    "type" : "apache_drill_logical_plan",
+    "version" : 1,
+    "generator" : {
+      "type" : "manual",
+      "info" : "na"
+    }
+  },
+  "storage" : [ {
+    "type" : "queue",
+    "name" : "queue",
+    "encoding" : "RECORD"
+  }, {
+    "type" : "classpath",
+    "name" : "donuts-json"
+  } ],
+  "query" : [ {
+    "op" : "scan",
+    "@id" : 1,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/employees.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 1,
+    "@id" : 2,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  },  {
+    op: "segment",
+    "input" : 2,
+    "@id" : 3,
+    ref: "segment",
+    exprs: ["deptId"]
+  }, {
+    "input" : 3,
+    "@id" : 4,
+    op: "collapsingaggregate",
+    within: "segment",
+    carryovers: [ "deptId" ],
+    aggregations: [
+                { ref: "typeCount",  expr: "count(1)" }
+              ]
+  }, 
+  {
+    op: "order",
+    "input" : 4,
+    "@id" : 5,
+    orderings: [
+    	{order: "asc", expr: "deptId"}
+    ]
+  },
+  {
+    "op" : "store",
+    "input" : 5,
+    "@id" : 6,
+    "memo" : "output sink",
+    "target" : {
+      "number" : 0
+    },
+    "partition" : null,
+    "storageEngine" : "queue"
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/resources/order/nulls-first.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/order/nulls-first.json b/sandbox/prototype/exec/ref/src/test/resources/order/nulls-first.json
new file mode 100644
index 0000000..a0810e5
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/order/nulls-first.json
@@ -0,0 +1,71 @@
+{
+  "head" : {
+    "type" : "apache_drill_logical_plan",
+    "version" : 1,
+    "generator" : {
+      "type" : "manual",
+      "info" : "na"
+    }
+  },
+  "storage" : [ {
+    "type" : "queue",
+    "name" : "queue",
+    "encoding" : "RECORD"
+  }, {
+    "type" : "classpath",
+    "name" : "donuts-json"
+  } ],
+  "query" : [ {
+    "op" : "scan",
+    "@id" : 1,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/employees.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 1,
+    "@id" : 2,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  },  {
+    op: "segment",
+    "input" : 2,
+    "@id" : 3,
+    ref: "segment",
+    exprs: ["deptId"]
+  }, {
+    "input" : 3,
+    "@id" : 4,
+    op: "collapsingaggregate",
+    within: "segment",
+    carryovers: [ "deptId" ],
+    aggregations: [
+                { ref: "typeCount",  expr: "count(1)" }
+              ]
+  }, 
+  {
+    op: "order",
+    "input" : 4,
+    "@id" : 5,
+    orderings: [
+    	{order: "asc", expr: "deptId", nullCollation: "first"}
+    ]
+  },
+  {
+    "op" : "store",
+    "input" : 5,
+    "@id" : 6,
+    "memo" : "output sink",
+    "target" : {
+      "number" : 0
+    },
+    "partition" : null,
+    "storageEngine" : "queue"
+  } ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0f8cccb3/sandbox/prototype/exec/ref/src/test/resources/order/nulls-last.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/order/nulls-last.json b/sandbox/prototype/exec/ref/src/test/resources/order/nulls-last.json
new file mode 100644
index 0000000..117a562
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/resources/order/nulls-last.json
@@ -0,0 +1,71 @@
+{
+  "head" : {
+    "type" : "apache_drill_logical_plan",
+    "version" : 1,
+    "generator" : {
+      "type" : "manual",
+      "info" : "na"
+    }
+  },
+  "storage" : [ {
+    "type" : "queue",
+    "name" : "queue",
+    "encoding" : "RECORD"
+  }, {
+    "type" : "classpath",
+    "name" : "donuts-json"
+  } ],
+  "query" : [ {
+    "op" : "scan",
+    "@id" : 1,
+    "memo" : "initial_scan",
+    "storageengine" : "donuts-json",
+    "selection" : {
+      "path" : "/employees.json",
+      "type" : "JSON"
+    },
+    "ref" : "_MAP"
+  }, {
+    "op" : "project",
+    "input" : 1,
+    "@id" : 2,
+    "projections" : [ {
+      "ref" : "output.deptId",
+      "expr" : "_MAP.deptId"
+    } ]
+  },  {
+    op: "segment",
+    "input" : 2,
+    "@id" : 3,
+    ref: "segment",
+    exprs: ["deptId"]
+  }, {
+    "input" : 3,
+    "@id" : 4,
+    op: "collapsingaggregate",
+    within: "segment",
+    carryovers: [ "deptId" ],
+    aggregations: [
+                { ref: "typeCount",  expr: "count(1)" }
+              ]
+  }, 
+  {
+    op: "order",
+    "input" : 4,
+    "@id" : 5,
+    orderings: [
+    	{order: "asc", expr: "deptId", nullCollation: "last"}
+    ]
+  },
+  {
+    "op" : "store",
+    "input" : 5,
+    "@id" : 6,
+    "memo" : "output sink",
+    "target" : {
+      "number" : 0
+    },
+    "partition" : null,
+    "storageEngine" : "queue"
+  } ]
+}