You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2016/01/21 00:29:03 UTC

[1/3] drill git commit: DRILL-4182 Set type to LATE when values are nulls.

Repository: drill
Updated Branches:
  refs/heads/master 88ea7a25a -> 2f0e3f27e


DRILL-4182 Set type to LATE when values are nulls.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/717adcbb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/717adcbb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/717adcbb

Branch: refs/heads/master
Commit: 717adcbb8bd076224efb564feb3ac932a6b79fd9
Parents: 34bf45f
Author: Amit Hadke <am...@gmail.com>
Authored: Thu Dec 17 14:58:30 2015 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:29:16 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/drill/exec/record/SchemaUtil.java     | 2 ++
 .../drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java       | 2 +-
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/717adcbb/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index a9c9c96..e13e742 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -122,6 +122,8 @@ public class SchemaUtil {
         for (int i = 0; i < valueCount; i++) {
           if (!vv.getAccessor().isNull(i)) {
             u.getMutator().setType(i, type);
+          } else {
+            u.getMutator().setType(i, MinorType.LATE);
           }
         }
         for (MinorType t : field.getType().getSubTypeList()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/717adcbb/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
index 8d78c9d..d5bb2d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -204,7 +204,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
       .baselineValues(null, null, null, null, "200", "200")
       .baselineValues(null, null, null, null, "201", "201")
       .baselineValues(null, null, null, null, "202", "202");
-    //builder.go();
+    builder.go();
 
     // Since client can't handle new columns which are not in first batch, we won't test output of query.
     // Query should run w/o any errors.


[2/3] drill git commit: DRILL-4182 Take care of null comparisions.

Posted by sm...@apache.org.
DRILL-4182 Take care of null comparisions.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/34bf45f1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/34bf45f1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/34bf45f1

Branch: refs/heads/master
Commit: 34bf45f13c1fd23628ef39fb510df2b4f5750b3b
Parents: 88ea7a2
Author: Amit Hadke <am...@gmail.com>
Authored: Wed Dec 16 17:19:13 2015 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:29:16 2016 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/record/SchemaUtil.java    |  6 ++++--
 .../physical/impl/TopN/TestTopNSchemaChanges.java   | 16 +++++++++-------
 .../src/main/codegen/templates/UnionVector.java     |  6 ++++--
 3 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 48f0a36..a9c9c96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -117,10 +117,12 @@ public class SchemaUtil {
         ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
         Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
         UnionVector u = (UnionVector) newVector;
-        u.addVector(tp.getTo());
+        final ValueVector vv = u.addVector(tp.getTo());
         MinorType type = v.getField().getType().getMinorType();
         for (int i = 0; i < valueCount; i++) {
-          u.getMutator().setType(i, type);
+          if (!vv.getAccessor().isNull(i)) {
+            u.getMutator().setType(i, type);
+          }
         }
         for (MinorType t : field.getType().getSubTypeList()) {
           if (u.getField().getType().getSubTypeList().contains(t)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
index 0f65bab..8d78c9d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.TopN;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.TestBuilder;
-import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -155,7 +155,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
   public void testMissingColumn() throws Exception {
     final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
     data_dir.mkdirs();
-    System.out.println(data_dir);
+
     BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
     for (int i = 0; i < 100; i++) {
       writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
@@ -166,6 +166,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
       writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
     }
     writer.close();
+
     writer = new BufferedWriter(new FileWriter(new File(data_dir, "d3.json")));
     for (int i = 200; i < 300; i++) {
       writer.write(String.format("{ \"kl2\" : \"%s\" , \"vl2\": \"%s\" }\n", i, i));
@@ -194,16 +195,17 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
       .baselineValues(null, null, 2l, 2l, null, null);
     builder.go();
 
-    query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 desc limit 3", data_dir.toPath().toString());
+    query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 limit 3", data_dir.toPath().toString());
     builder = testBuilder()
       .sqlQuery(query)
       .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
       .ordered()
       .baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
-      .baselineValues(null, null, null, null, "299", "299")
-      .baselineValues(null, null, null, null, "298", "298")
-      .baselineValues(null, null, null, null, "297", "297");
-    builder.go();
+      .baselineValues(null, null, null, null, "200", "200")
+      .baselineValues(null, null, null, null, "201", "201")
+      .baselineValues(null, null, null, null, "202", "202");
+    //builder.go();
+
     // Since client can't handle new columns which are not in first batch, we won't test output of query.
     // Query should run w/o any errors.
     test(String.format("select * from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString()));

http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index 2e278b1..a5e1c0d 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -17,6 +17,7 @@
  */
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.ValueVector;
 
 <@pp.dropOutputFile />
 <@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/UnionVector.java" />
@@ -232,14 +233,15 @@ public class UnionVector implements ValueVector {
     copyFrom(inIndex, outIndex, from);
   }
 
-  public void addVector(ValueVector v) {
+  public ValueVector addVector(ValueVector v) {
     String name = v.getField().getType().getMinorType().name().toLowerCase();
     MajorType type = v.getField().getType();
     Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
-    ValueVector newVector = internalMap.addOrGet(name, type, (Class<ValueVector>) BasicTypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+    final ValueVector newVector = internalMap.addOrGet(name, type, (Class<ValueVector>) BasicTypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
     v.makeTransferPair(newVector).transfer();
     internalMap.putChild(name, newVector);
     addSubType(v.getField().getType().getMinorType());
+    return newVector;
   }
 
   private class TransferImpl implements TransferPair {


[3/3] drill git commit: DRILL-4190 Don't hold on to batches from left side of merge join.

Posted by sm...@apache.org.
DRILL-4190 Don't hold on to batches from left side of merge join.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2f0e3f27
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2f0e3f27
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2f0e3f27

Branch: refs/heads/master
Commit: 2f0e3f27e630d5ac15cdaef808564e01708c3c55
Parents: 717adcb
Author: Amit Hadke <am...@gmail.com>
Authored: Sun Jan 3 22:08:21 2016 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:30:26 2016 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/join/MergeJoinBatch.java |   2 +-
 .../drill/exec/record/RecordIterator.java       |  90 +++--
 .../drill/exec/record/TestRecordIterator.java   | 346 +++++++++++++++++++
 .../resources/record/test_recorditerator.json   |  71 ++++
 4 files changed, 477 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 9ef5cde..10d0f20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -113,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
     }
     this.left = left;
-    this.leftIterator = new RecordIterator(left, this, oContext, 0);
+    this.leftIterator = new RecordIterator(left, this, oContext, 0, false);
     this.right = right;
     this.rightIterator = new RecordIterator(right, this, oContext, 1);
     this.joinType = popConfig.getJoinType();

http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index af0a753..918a8da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -25,11 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
 import com.google.common.collect.TreeRangeMap;
 
 /**
@@ -54,6 +50,7 @@ public class RecordIterator implements VectorAccessible {
   private boolean lastBatchRead;    // True if all batches are consumed.
   private boolean initialized;
   private OperatorContext oContext;
+  private final boolean enableMarkAndReset;
 
   private final VectorContainer container; // Holds VectorContainer of current record batch
   private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create();
@@ -62,6 +59,14 @@ public class RecordIterator implements VectorAccessible {
                         AbstractRecordBatch<?> outgoing,
                         OperatorContext oContext,
                         int inputIndex) {
+    this(incoming, outgoing, oContext, inputIndex, true);
+  }
+
+  public RecordIterator(RecordBatch incoming,
+                        AbstractRecordBatch<?> outgoing,
+                        OperatorContext oContext,
+                        int inputIndex,
+                        boolean enableMarkAndReset) {
     this.incoming = incoming;
     this.outgoing = outgoing;
     this.inputIndex = inputIndex;
@@ -70,6 +75,7 @@ public class RecordIterator implements VectorAccessible {
     this.oContext = oContext;
     resetIndices();
     this.initialized = false;
+    this.enableMarkAndReset = enableMarkAndReset;
   }
 
   private void resetIndices() {
@@ -88,14 +94,17 @@ public class RecordIterator implements VectorAccessible {
     if (lastBatchRead) {
       return;
     }
-    lastOutcome = outgoing.next(inputIndex, incoming);
+    lastOutcome = outgoing != null ? outgoing.next(inputIndex, incoming) : incoming.next();
   }
 
   public void mark() {
+    if (!enableMarkAndReset) {
+      throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+    }
     // Release all batches before current batch. [0 to startBatchPosition).
     final Map<Range<Long>,RecordBatchData> oldBatches = batches.subRangeMap(Range.closedOpen(0l, startBatchPosition)).asMapOfRanges();
-    for (Range<Long> range : oldBatches.keySet()) {
-      oldBatches.get(range).clear();
+    for (RecordBatchData rbd : oldBatches.values()) {
+      rbd.clear();
     }
     batches.remove(Range.closedOpen(0l, startBatchPosition));
     markedInnerPosition = innerPosition;
@@ -103,12 +112,15 @@ public class RecordIterator implements VectorAccessible {
   }
 
   public void reset() {
+    if (!enableMarkAndReset) {
+      throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+    }
     if (markedOuterPosition >= 0) {
       // Move to rbd for markedOuterPosition.
       final RecordBatchData rbdNew = batches.get(markedOuterPosition);
       final RecordBatchData rbdOld = batches.get(startBatchPosition);
-      Preconditions.checkArgument(rbdOld != null);
-      Preconditions.checkArgument(rbdNew != null);
+      assert rbdOld != null;
+      assert rbdNew != null;
       if (rbdNew != rbdOld) {
         container.transferOut(rbdOld.getContainer());
         container.transferIn(rbdNew.getContainer());
@@ -125,13 +137,16 @@ public class RecordIterator implements VectorAccessible {
 
   // Move forward by delta (may cross one or more record batches)
   public void forward(long delta) {
-    Preconditions.checkArgument(delta >= 0);
-    Preconditions.checkArgument(delta + outerPosition < totalRecordCount);
+    if (!enableMarkAndReset) {
+      throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+    }
+    assert delta >= 0;
+    assert (delta + outerPosition) < totalRecordCount;
     final long nextOuterPosition = delta + outerPosition;
     final RecordBatchData rbdNew = batches.get(nextOuterPosition);
     final RecordBatchData rbdOld = batches.get(outerPosition);
-    Preconditions.checkArgument(rbdNew != null);
-    Preconditions.checkArgument(rbdOld != null);
+    assert rbdNew != null;
+    assert rbdOld != null;
     container.transferOut(rbdOld.getContainer());
     // Get vectors from new position.
     container.transferIn(rbdNew.getContainer());
@@ -172,6 +187,9 @@ public class RecordIterator implements VectorAccessible {
           // No more data, disallow reads unless reset is called.
           outerPosition = nextOuterPosition;
           lastBatchRead = true;
+          if (!enableMarkAndReset) {
+            container.clear();
+          }
           break;
         case OK_NEW_SCHEMA:
         case OK:
@@ -193,14 +211,19 @@ public class RecordIterator implements VectorAccessible {
             initialized = true;
           }
           if (innerRecordCount > 0) {
-            // Transfer vectors back to old batch.
-            if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) {
-              container.transferOut(batches.get(outerPosition).getContainer());
+            if (enableMarkAndReset) {
+              // Transfer vectors back to old batch.
+              if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) {
+                container.transferOut(batches.get(outerPosition).getContainer());
+              }
+              container.transferIn(rbd.getContainer());
+              batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd);
+            } else {
+              container.zeroVectors();
+              container.transferIn(rbd.getContainer());
             }
-            container.transferIn(rbd.getContainer());
-            startBatchPosition = nextOuterPosition;
-            batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd);
             innerPosition = 0;
+            startBatchPosition = nextOuterPosition;
             outerPosition = nextOuterPosition;
             totalRecordCount += innerRecordCount;
           } else {
@@ -216,12 +239,13 @@ public class RecordIterator implements VectorAccessible {
       }
     } else {
       if (nextInnerPosition >= innerRecordCount) {
+        assert enableMarkAndReset;
         // move to next batch
         final RecordBatchData rbdNew = batches.get(nextOuterPosition);
         final RecordBatchData rbdOld = batches.get(outerPosition);
-        Preconditions.checkArgument(rbdNew != null);
-        Preconditions.checkArgument(rbdOld != null);
-        Preconditions.checkArgument(rbdOld != rbdNew);
+        assert rbdNew != null;
+        assert rbdOld != null;
+        assert rbdOld != rbdNew;
         container.transferOut(rbdOld.getContainer());
         container.transferIn(rbdNew.getContainer());
         innerPosition = 0;
@@ -257,40 +281,44 @@ public class RecordIterator implements VectorAccessible {
   }
 
   public int getCurrentPosition() {
-    Preconditions.checkArgument(initialized);
-    Preconditions.checkArgument(innerPosition >= 0 && innerPosition < innerRecordCount,
-      String.format("innerPosition:%d, outerPosition:%d, innerRecordCount:%d, totalRecordCount:%d",
-        innerPosition, outerPosition, innerRecordCount, totalRecordCount));
+    assert initialized;
+    assert innerPosition >= 0;
+    assert innerPosition < innerRecordCount;
     return innerPosition;
   }
 
+  // Test purposes only.
+  public Map<Range<Long>, RecordBatchData> cachedBatches() {
+    return batches.asMapOfRanges();
+  }
+
   @Override
   public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-    Preconditions.checkArgument(initialized);
+    assert initialized;
     return container.getValueAccessorById(clazz, ids);
   }
 
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
-    Preconditions.checkArgument(initialized);
+    assert initialized;
     return container.getValueVectorId(path);
   }
 
   @Override
   public BatchSchema getSchema() {
-    Preconditions.checkArgument(initialized);
+    assert initialized;
     return container.getSchema();
   }
 
   @Override
   public int getRecordCount() {
-    Preconditions.checkArgument(initialized);
+    assert initialized;
     return innerRecordCount;
   }
 
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
-    Preconditions.checkArgument(initialized);
+    assert initialized;
     return container.iterator();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
new file mode 100644
index 0000000..f892f0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.compile.CodeCompilerTestFactory;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import java.util.List;
+
+public class TestRecordIterator extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestRecordIterator.class);
+  DrillConfig c = DrillConfig.create();
+
+  @Test
+  public void testSimpleIterator(@Injectable final DrillbitContext bitContext,
+                                  @Injectable UserServer.UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry();
+      bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+      bitContext.getConfig(); result = c;
+      bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
+    }};
+
+    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+    final String planStr = Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"), Charsets.UTF_8);
+
+    final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+    final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    final FragmentContext context = new FragmentContext(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+    final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) operatorList.iterator().next()));
+
+    RecordBatch singleBatch = exec.getIncoming();
+    PhysicalOperator dummyPop = operatorList.iterator().next();
+    OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+      OperatorContext.getChildCount(dummyPop));
+    OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
+    RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
+    int totalRecords = 0;
+    List<ValueVector> vectors = null;
+
+    while (true) {
+      iter.next();
+      if (iter.finished()) {
+        break;
+      } else {
+        // First time save vectors.
+        if (vectors == null) {
+          vectors = Lists.newArrayList();
+          for (VectorWrapper vw : iter) {
+            vectors.add(vw.getValueVector());
+          }
+        }
+        final int position = iter.getCurrentPosition();
+        if (position %2 == 0 ) {
+          assertTrue(checkValues(vectors, position));
+        } else {
+          assertTrue(checkValues(vectors, position));
+        }
+        totalRecords++;
+      }
+      assertEquals(0, iter.cachedBatches().size());
+    }
+    assertEquals(11112, totalRecords);
+    try {
+      iter.mark();
+      assertTrue(false);
+    } catch (UnsupportedOperationException e) {}
+    try {
+      iter.reset();
+      assertTrue(false);
+    } catch (UnsupportedOperationException e) {}
+  }
+
+  @Test
+  public void testMarkResetIterator(@Injectable final DrillbitContext bitContext,
+                                 @Injectable UserServer.UserClientConnection connection) throws Throwable{
+    new NonStrictExpectations(){{
+      bitContext.getMetrics(); result = new MetricRegistry();
+      bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+      bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+      bitContext.getConfig(); result = c;
+      bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
+    }};
+
+    final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+    final String planStr = Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"), Charsets.UTF_8);
+
+    final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+    final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+    final FragmentContext context = new FragmentContext(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+    final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+    SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) operatorList.iterator().next()));
+
+    RecordBatch singleBatch = exec.getIncoming();
+    PhysicalOperator dummyPop = operatorList.iterator().next();
+    OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+      OperatorContext.getChildCount(dummyPop));
+    OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
+    RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0);
+    List<ValueVector> vectors = null;
+    // batche sizes
+    // 1, 100, 10, 10000, 1, 1000
+    // total = 11112
+
+    // BATCH 1 : 1, starting outerposition: 0
+    iter.next();
+    assertFalse(iter.finished());
+    assertEquals(1, iter.getTotalRecordCount());
+    assertEquals(0, iter.getCurrentPosition());
+    assertEquals(0, iter.getOuterPosition());
+    assertEquals(1, iter.cachedBatches().size());
+    vectors = Lists.newArrayList();
+    for (VectorWrapper vw : iter) {
+      vectors.add(vw.getValueVector());
+    }
+    // mark at position 0
+    iter.mark();
+    checkValues(vectors, 0);
+
+    // BATCH 2: 100, starting outerposition: 1
+    iter.next();
+    assertFalse(iter.finished());
+    assertEquals(101, iter.getTotalRecordCount(), 101);
+    assertEquals(0, iter.getCurrentPosition());
+    assertEquals(100, iter.getInnerRecordCount());
+    assertEquals(1, iter.getOuterPosition());
+    assertEquals(2, iter.cachedBatches().size());
+    for (int i = 0; i < 100; i++) {
+      checkValues(vectors, i);
+      iter.next();
+    }
+
+    // BATCH 3 :10, starting outerposition: 101
+    assertFalse(iter.finished());
+    assertEquals(111, iter.getTotalRecordCount());
+    assertEquals(0, iter.getCurrentPosition());
+    assertEquals(10, iter.getInnerRecordCount());
+    assertEquals(101, iter.getOuterPosition());
+    assertEquals(3, iter.cachedBatches().size());
+    for (int i = 0; i < 10; i++) {
+      checkValues(vectors, i);
+      iter.next();
+    }
+
+    // BATCH 4 : 10000, starting outerposition: 111
+    assertFalse(iter.finished());
+    assertEquals(10111, iter.getTotalRecordCount());
+    assertEquals(0, iter.getCurrentPosition(), 0);
+    assertEquals(10000, iter.getInnerRecordCount());
+    assertEquals(111, iter.getOuterPosition());
+    assertEquals(4, iter.cachedBatches().size());
+    for (int i = 0; i < 10000; i++) {
+      checkValues(vectors, i);
+      iter.next();
+    }
+
+    // BATCH 5 : 1, starting outerposition: 10111
+    assertFalse(iter.finished());
+    assertEquals(10112, iter.getTotalRecordCount());
+    assertEquals(0, iter.getCurrentPosition());
+    assertEquals(1, iter.getInnerRecordCount());
+    assertEquals(10111, iter.getOuterPosition());
+    assertEquals(5, iter.cachedBatches().size());
+    checkValues(vectors, 0);
+    iter.next();
+
+    // BATCH 6 : 1000, starting outerposition: 10112
+    assertFalse(iter.finished());
+    assertEquals(11112, iter.getTotalRecordCount());
+    assertEquals(0, iter.getCurrentPosition());
+    assertEquals(1000, iter.getInnerRecordCount());
+    assertEquals(10112, iter.getOuterPosition());
+    assertEquals(6, iter.cachedBatches().size());
+    for (int i = 0; i < 1000; i++) {
+      checkValues(vectors, i);
+      iter.next();
+    }
+    assertTrue(iter.finished());
+    assertEquals(6, iter.cachedBatches().size());
+
+    // back to batch 1
+    iter.reset();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(6, iter.cachedBatches().size());
+    assertEquals(iter.getCurrentPosition(), 0);
+    assertEquals(1, iter.getInnerRecordCount());
+    checkValues(vectors, 0);
+
+    iter.next();
+    // mark start of batch 2
+    iter.mark();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(5, iter.cachedBatches().size());
+    assertEquals(iter.getCurrentPosition(), 0);
+    assertEquals(100, iter.getInnerRecordCount());
+    for (int i = 0; i < 100; i++) {
+      iter.next();
+    }
+
+    // mark start of batch 3
+    iter.mark();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(4, iter.cachedBatches().size());
+    assertEquals(iter.getCurrentPosition(), 0);
+    assertEquals(10, iter.getInnerRecordCount());
+    for (int i = 0; i < 10; i++) {
+      iter.next();
+    }
+
+    // jump into middle of largest batch #4.
+    for (int i = 0; i < 5000; i++) {
+      iter.next();
+    }
+    assertEquals(4, iter.cachedBatches().size());
+    iter.mark();
+    assertEquals(3, iter.cachedBatches().size());
+    for (int i = 0; i < 5000; i++) {
+      iter.next();
+    }
+
+    // mark start of batch 5
+    iter.mark();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(2, iter.cachedBatches().size());
+    assertEquals(iter.getCurrentPosition(), 0);
+    assertEquals(1, iter.getInnerRecordCount());
+
+    // move to last batch
+    iter.next();
+    // skip to the middle of last batch
+    for (int i = 0; i < 500; i++) {
+      iter.next();
+    }
+    checkValues(vectors, 499);
+    checkValues(vectors, 500);
+    iter.reset();
+    checkValues(vectors, 0);
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(2, iter.cachedBatches().size());
+    assertEquals(iter.getCurrentPosition(), 0);
+    assertEquals(1, iter.getInnerRecordCount());
+    // move to last batch
+    iter.next();
+    assertEquals(0, iter.getCurrentPosition());
+    for (int i = 0; i < 500; i++) {
+      iter.next();
+    }
+    // This should free 5th batch.
+    iter.mark();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(1, iter.cachedBatches().size());
+    assertEquals(500, iter.getCurrentPosition());
+    assertEquals(1000, iter.getInnerRecordCount());
+    // go to the end of iterator
+    for (int i = 0; i < 500; i++) {
+      iter.next();
+    }
+    assertTrue(iter.finished());
+    iter.reset();
+    assertFalse(iter.finished());
+    assertEquals(iter.getTotalRecordCount(), 11112);
+    assertEquals(1, iter.cachedBatches().size());
+    assertEquals(500, iter.getCurrentPosition());
+    assertEquals(1000, iter.getInnerRecordCount());
+    iter.close();
+    assertEquals(0, iter.cachedBatches().size());
+  }
+
+  private static boolean checkValues(List<ValueVector> vectors, int position) {
+    boolean result = true;
+    final int expected = (position % 2 == 0)? Integer.MIN_VALUE : Integer.MAX_VALUE;
+    for (ValueVector vv : vectors) {
+      final Object o = vv.getAccessor().getObject(position);
+      if (o instanceof Integer) {
+        final Integer v = (Integer)o;
+        result &= (v == expected);
+      } else {
+        System.out.println(String.format("Found wrong type %s at position %d", o.getClass(), position));
+        result = false;
+        break;
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/test/resources/record/test_recorditerator.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/record/test_recorditerator.json b/exec/java-exec/src/test/resources/record/test_recorditerator.json
new file mode 100644
index 0000000..057466b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/record/test_recorditerator.json
@@ -0,0 +1,71 @@
+{
+    head:{
+      type:"APACHE_DRILL_PHYSICAL",
+      version:"1",
+      generator:{
+        type:"manual"
+      }
+    },
+    graph: [
+      {
+        @id:1,
+        pop:"mock-sub-scan",
+        url: "http://apache.org",
+        entries:[
+          {
+            records: 1,
+            types: [
+              {name: "k", type: "INT", mode: "REQUIRED"},
+              {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          },
+          {
+            records: 100,
+            types: [
+            {name: "k", type: "INT", mode: "REQUIRED"},
+            {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          },
+          {
+            records: 10,
+            types: [
+              {name: "k", type: "INT", mode: "REQUIRED"},
+              {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          },
+          {
+            records: 10000,
+            types: [
+              {name: "k", type: "INT", mode: "REQUIRED"},
+              {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          },
+          {
+            records: 1,
+            types: [
+              {name: "k", type: "INT", mode: "REQUIRED"},
+              {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          },
+          {
+            records: 1000,
+            types: [
+              {name: "k", type: "INT", mode: "REQUIRED"},
+              {name: "v", type: "INT", mode: "REQUIRED"}
+            ]
+          }
+        ]
+      },
+      {
+        @id:2,
+        child:1,
+        pop:"project",
+        exprs:[ { ref : "`*`", expr : "`*`"} ]
+      },
+      {
+        @id:3,
+        child:2,
+        pop:"screen"
+      }
+    ]
+}