You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2016/01/21 00:29:03 UTC
[1/3] drill git commit: DRILL-4182 Set type to LATE when values are
nulls.
Repository: drill
Updated Branches:
refs/heads/master 88ea7a25a -> 2f0e3f27e
DRILL-4182 Set type to LATE when values are nulls.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/717adcbb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/717adcbb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/717adcbb
Branch: refs/heads/master
Commit: 717adcbb8bd076224efb564feb3ac932a6b79fd9
Parents: 34bf45f
Author: Amit Hadke <am...@gmail.com>
Authored: Thu Dec 17 14:58:30 2015 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:29:16 2016 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/drill/exec/record/SchemaUtil.java | 2 ++
.../drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java | 2 +-
2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/717adcbb/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index a9c9c96..e13e742 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -122,6 +122,8 @@ public class SchemaUtil {
for (int i = 0; i < valueCount; i++) {
if (!vv.getAccessor().isNull(i)) {
u.getMutator().setType(i, type);
+ } else {
+ u.getMutator().setType(i, MinorType.LATE);
}
}
for (MinorType t : field.getType().getSubTypeList()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/717adcbb/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
index 8d78c9d..d5bb2d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -204,7 +204,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
.baselineValues(null, null, null, null, "200", "200")
.baselineValues(null, null, null, null, "201", "201")
.baselineValues(null, null, null, null, "202", "202");
- //builder.go();
+ builder.go();
// Since client can't handle new columns which are not in first batch, we won't test output of query.
// Query should run w/o any errors.
[2/3] drill git commit: DRILL-4182 Take care of null comparisions.
Posted by sm...@apache.org.
DRILL-4182 Take care of null comparisions.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/34bf45f1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/34bf45f1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/34bf45f1
Branch: refs/heads/master
Commit: 34bf45f13c1fd23628ef39fb510df2b4f5750b3b
Parents: 88ea7a2
Author: Amit Hadke <am...@gmail.com>
Authored: Wed Dec 16 17:19:13 2015 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:29:16 2016 -0800
----------------------------------------------------------------------
.../org/apache/drill/exec/record/SchemaUtil.java | 6 ++++--
.../physical/impl/TopN/TestTopNSchemaChanges.java | 16 +++++++++-------
.../src/main/codegen/templates/UnionVector.java | 6 ++++--
3 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 48f0a36..a9c9c96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -117,10 +117,12 @@ public class SchemaUtil {
ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator());
Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector");
UnionVector u = (UnionVector) newVector;
- u.addVector(tp.getTo());
+ final ValueVector vv = u.addVector(tp.getTo());
MinorType type = v.getField().getType().getMinorType();
for (int i = 0; i < valueCount; i++) {
- u.getMutator().setType(i, type);
+ if (!vv.getAccessor().isNull(i)) {
+ u.getMutator().setType(i, type);
+ }
}
for (MinorType t : field.getType().getSubTypeList()) {
if (u.getField().getType().getSubTypeList().contains(t)) {
http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
index 0f65bab..8d78c9d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNSchemaChanges.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.TopN;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.TestBuilder;
-import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.BufferedWriter;
@@ -155,7 +155,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
public void testMissingColumn() throws Exception {
final File data_dir = new File(BaseTestQuery.getTempDir("topn-schemachanges"));
data_dir.mkdirs();
- System.out.println(data_dir);
+
BufferedWriter writer = new BufferedWriter(new FileWriter(new File(data_dir, "d1.json")));
for (int i = 0; i < 100; i++) {
writer.write(String.format("{ \"kl1\" : %d , \"vl1\": %d }\n", i, i));
@@ -166,6 +166,7 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
writer.write(String.format("{ \"kl\" : %f , \"vl\": %f }\n", (float)i, (float)i));
}
writer.close();
+
writer = new BufferedWriter(new FileWriter(new File(data_dir, "d3.json")));
for (int i = 200; i < 300; i++) {
writer.write(String.format("{ \"kl2\" : \"%s\" , \"vl2\": \"%s\" }\n", i, i));
@@ -194,16 +195,17 @@ public class TestTopNSchemaChanges extends BaseTestQuery {
.baselineValues(null, null, 2l, 2l, null, null);
builder.go();
- query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 desc limit 3", data_dir.toPath().toString());
+ query = String.format("select kl, vl, kl1, vl1, kl2, vl2 from dfs_test.`%s` order by kl2 limit 3", data_dir.toPath().toString());
builder = testBuilder()
.sqlQuery(query)
.optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true")
.ordered()
.baselineColumns("kl", "vl", "kl1", "vl1", "kl2", "vl2")
- .baselineValues(null, null, null, null, "299", "299")
- .baselineValues(null, null, null, null, "298", "298")
- .baselineValues(null, null, null, null, "297", "297");
- builder.go();
+ .baselineValues(null, null, null, null, "200", "200")
+ .baselineValues(null, null, null, null, "201", "201")
+ .baselineValues(null, null, null, null, "202", "202");
+ //builder.go();
+
// Since client can't handle new columns which are not in first batch, we won't test output of query.
// Query should run w/o any errors.
test(String.format("select * from dfs_test.`%s` order by kl limit 3", data_dir.toPath().toString()));
http://git-wip-us.apache.org/repos/asf/drill/blob/34bf45f1/exec/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index 2e278b1..a5e1c0d 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -17,6 +17,7 @@
*/
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.ValueVector;
<@pp.dropOutputFile />
<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/UnionVector.java" />
@@ -232,14 +233,15 @@ public class UnionVector implements ValueVector {
copyFrom(inIndex, outIndex, from);
}
- public void addVector(ValueVector v) {
+ public ValueVector addVector(ValueVector v) {
String name = v.getField().getType().getMinorType().name().toLowerCase();
MajorType type = v.getField().getType();
Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
- ValueVector newVector = internalMap.addOrGet(name, type, (Class<ValueVector>) BasicTypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ final ValueVector newVector = internalMap.addOrGet(name, type, (Class<ValueVector>) BasicTypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
v.makeTransferPair(newVector).transfer();
internalMap.putChild(name, newVector);
addSubType(v.getField().getType().getMinorType());
+ return newVector;
}
private class TransferImpl implements TransferPair {
[3/3] drill git commit: DRILL-4190 Don't hold on to batches from left
side of merge join.
Posted by sm...@apache.org.
DRILL-4190 Don't hold on to batches from left side of merge join.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2f0e3f27
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2f0e3f27
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2f0e3f27
Branch: refs/heads/master
Commit: 2f0e3f27e630d5ac15cdaef808564e01708c3c55
Parents: 717adcb
Author: Amit Hadke <am...@gmail.com>
Authored: Sun Jan 3 22:08:21 2016 -0800
Committer: Steven Phillips <st...@dremio.com>
Committed: Wed Jan 20 14:30:26 2016 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/MergeJoinBatch.java | 2 +-
.../drill/exec/record/RecordIterator.java | 90 +++--
.../drill/exec/record/TestRecordIterator.java | 346 +++++++++++++++++++
.../resources/record/test_recorditerator.json | 71 ++++
4 files changed, 477 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 9ef5cde..10d0f20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -113,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
}
this.left = left;
- this.leftIterator = new RecordIterator(left, this, oContext, 0);
+ this.leftIterator = new RecordIterator(left, this, oContext, 0, false);
this.right = right;
this.rightIterator = new RecordIterator(right, this, oContext, 1);
this.joinType = popConfig.getJoinType();
http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index af0a753..918a8da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -25,11 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeMap;
/**
@@ -54,6 +50,7 @@ public class RecordIterator implements VectorAccessible {
private boolean lastBatchRead; // True if all batches are consumed.
private boolean initialized;
private OperatorContext oContext;
+ private final boolean enableMarkAndReset;
private final VectorContainer container; // Holds VectorContainer of current record batch
private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create();
@@ -62,6 +59,14 @@ public class RecordIterator implements VectorAccessible {
AbstractRecordBatch<?> outgoing,
OperatorContext oContext,
int inputIndex) {
+ this(incoming, outgoing, oContext, inputIndex, true);
+ }
+
+ public RecordIterator(RecordBatch incoming,
+ AbstractRecordBatch<?> outgoing,
+ OperatorContext oContext,
+ int inputIndex,
+ boolean enableMarkAndReset) {
this.incoming = incoming;
this.outgoing = outgoing;
this.inputIndex = inputIndex;
@@ -70,6 +75,7 @@ public class RecordIterator implements VectorAccessible {
this.oContext = oContext;
resetIndices();
this.initialized = false;
+ this.enableMarkAndReset = enableMarkAndReset;
}
private void resetIndices() {
@@ -88,14 +94,17 @@ public class RecordIterator implements VectorAccessible {
if (lastBatchRead) {
return;
}
- lastOutcome = outgoing.next(inputIndex, incoming);
+ lastOutcome = outgoing != null ? outgoing.next(inputIndex, incoming) : incoming.next();
}
public void mark() {
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+ }
// Release all batches before current batch. [0 to startBatchPosition).
final Map<Range<Long>,RecordBatchData> oldBatches = batches.subRangeMap(Range.closedOpen(0l, startBatchPosition)).asMapOfRanges();
- for (Range<Long> range : oldBatches.keySet()) {
- oldBatches.get(range).clear();
+ for (RecordBatchData rbd : oldBatches.values()) {
+ rbd.clear();
}
batches.remove(Range.closedOpen(0l, startBatchPosition));
markedInnerPosition = innerPosition;
@@ -103,12 +112,15 @@ public class RecordIterator implements VectorAccessible {
}
public void reset() {
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+ }
if (markedOuterPosition >= 0) {
// Move to rbd for markedOuterPosition.
final RecordBatchData rbdNew = batches.get(markedOuterPosition);
final RecordBatchData rbdOld = batches.get(startBatchPosition);
- Preconditions.checkArgument(rbdOld != null);
- Preconditions.checkArgument(rbdNew != null);
+ assert rbdOld != null;
+ assert rbdNew != null;
if (rbdNew != rbdOld) {
container.transferOut(rbdOld.getContainer());
container.transferIn(rbdNew.getContainer());
@@ -125,13 +137,16 @@ public class RecordIterator implements VectorAccessible {
// Move forward by delta (may cross one or more record batches)
public void forward(long delta) {
- Preconditions.checkArgument(delta >= 0);
- Preconditions.checkArgument(delta + outerPosition < totalRecordCount);
+ if (!enableMarkAndReset) {
+ throw new UnsupportedOperationException("mark and reset disabled for this RecordIterator");
+ }
+ assert delta >= 0;
+ assert (delta + outerPosition) < totalRecordCount;
final long nextOuterPosition = delta + outerPosition;
final RecordBatchData rbdNew = batches.get(nextOuterPosition);
final RecordBatchData rbdOld = batches.get(outerPosition);
- Preconditions.checkArgument(rbdNew != null);
- Preconditions.checkArgument(rbdOld != null);
+ assert rbdNew != null;
+ assert rbdOld != null;
container.transferOut(rbdOld.getContainer());
// Get vectors from new position.
container.transferIn(rbdNew.getContainer());
@@ -172,6 +187,9 @@ public class RecordIterator implements VectorAccessible {
// No more data, disallow reads unless reset is called.
outerPosition = nextOuterPosition;
lastBatchRead = true;
+ if (!enableMarkAndReset) {
+ container.clear();
+ }
break;
case OK_NEW_SCHEMA:
case OK:
@@ -193,14 +211,19 @@ public class RecordIterator implements VectorAccessible {
initialized = true;
}
if (innerRecordCount > 0) {
- // Transfer vectors back to old batch.
- if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) {
- container.transferOut(batches.get(outerPosition).getContainer());
+ if (enableMarkAndReset) {
+ // Transfer vectors back to old batch.
+ if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) {
+ container.transferOut(batches.get(outerPosition).getContainer());
+ }
+ container.transferIn(rbd.getContainer());
+ batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd);
+ } else {
+ container.zeroVectors();
+ container.transferIn(rbd.getContainer());
}
- container.transferIn(rbd.getContainer());
- startBatchPosition = nextOuterPosition;
- batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd);
innerPosition = 0;
+ startBatchPosition = nextOuterPosition;
outerPosition = nextOuterPosition;
totalRecordCount += innerRecordCount;
} else {
@@ -216,12 +239,13 @@ public class RecordIterator implements VectorAccessible {
}
} else {
if (nextInnerPosition >= innerRecordCount) {
+ assert enableMarkAndReset;
// move to next batch
final RecordBatchData rbdNew = batches.get(nextOuterPosition);
final RecordBatchData rbdOld = batches.get(outerPosition);
- Preconditions.checkArgument(rbdNew != null);
- Preconditions.checkArgument(rbdOld != null);
- Preconditions.checkArgument(rbdOld != rbdNew);
+ assert rbdNew != null;
+ assert rbdOld != null;
+ assert rbdOld != rbdNew;
container.transferOut(rbdOld.getContainer());
container.transferIn(rbdNew.getContainer());
innerPosition = 0;
@@ -257,40 +281,44 @@ public class RecordIterator implements VectorAccessible {
}
public int getCurrentPosition() {
- Preconditions.checkArgument(initialized);
- Preconditions.checkArgument(innerPosition >= 0 && innerPosition < innerRecordCount,
- String.format("innerPosition:%d, outerPosition:%d, innerRecordCount:%d, totalRecordCount:%d",
- innerPosition, outerPosition, innerRecordCount, totalRecordCount));
+ assert initialized;
+ assert innerPosition >= 0;
+ assert innerPosition < innerRecordCount;
return innerPosition;
}
+ // Test purposes only.
+ public Map<Range<Long>, RecordBatchData> cachedBatches() {
+ return batches.asMapOfRanges();
+ }
+
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getValueAccessorById(clazz, ids);
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getValueVectorId(path);
}
@Override
public BatchSchema getSchema() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.getSchema();
}
@Override
public int getRecordCount() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return innerRecordCount;
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
- Preconditions.checkArgument(initialized);
+ assert initialized;
return container.iterator();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
new file mode 100644
index 0000000..f892f0d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.compile.CodeCompilerTestFactory;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import java.util.List;
+
+public class TestRecordIterator extends PopUnitTestBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestRecordIterator.class);
+ DrillConfig c = DrillConfig.create();
+
+ @Test
+ public void testSimpleIterator(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable{
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry();
+ bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+ bitContext.getConfig(); result = c;
+ bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
+ }};
+
+ final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+ final String planStr = Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"), Charsets.UTF_8);
+
+ final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+ final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ final FragmentContext context = new FragmentContext(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+ final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) operatorList.iterator().next()));
+
+ RecordBatch singleBatch = exec.getIncoming();
+ PhysicalOperator dummyPop = operatorList.iterator().next();
+ OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(dummyPop));
+ OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
+ RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
+ int totalRecords = 0;
+ List<ValueVector> vectors = null;
+
+ while (true) {
+ iter.next();
+ if (iter.finished()) {
+ break;
+ } else {
+ // First time save vectors.
+ if (vectors == null) {
+ vectors = Lists.newArrayList();
+ for (VectorWrapper vw : iter) {
+ vectors.add(vw.getValueVector());
+ }
+ }
+ final int position = iter.getCurrentPosition();
+ if (position %2 == 0 ) {
+ assertTrue(checkValues(vectors, position));
+ } else {
+ assertTrue(checkValues(vectors, position));
+ }
+ totalRecords++;
+ }
+ assertEquals(0, iter.cachedBatches().size());
+ }
+ assertEquals(11112, totalRecords);
+ try {
+ iter.mark();
+ assertTrue(false);
+ } catch (UnsupportedOperationException e) {}
+ try {
+ iter.reset();
+ assertTrue(false);
+ } catch (UnsupportedOperationException e) {}
+ }
+
+ @Test
+ public void testMarkResetIterator(@Injectable final DrillbitContext bitContext,
+ @Injectable UserServer.UserClientConnection connection) throws Throwable{
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry();
+ bitContext.getAllocator(); result = RootAllocatorFactory.newRoot(c);
+ bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(ClassPathScanner.fromPrescan(c));
+ bitContext.getConfig(); result = c;
+ bitContext.getCompiler(); result = CodeCompilerTestFactory.getTestCompiler(c);
+ }};
+
+ final PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
+
+ final String planStr = Files.toString(FileUtils.getResourceAsFile("/record/test_recorditerator.json"), Charsets.UTF_8);
+
+ final PhysicalPlan plan = reader.readPhysicalPlan(planStr);
+ final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ final FragmentContext context = new FragmentContext(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
+ final List<PhysicalOperator> operatorList = plan.getSortedOperators(false);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) operatorList.iterator().next()));
+
+ RecordBatch singleBatch = exec.getIncoming();
+ PhysicalOperator dummyPop = operatorList.iterator().next();
+ OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(dummyPop));
+ OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
+ RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0);
+ List<ValueVector> vectors = null;
+ // batche sizes
+ // 1, 100, 10, 10000, 1, 1000
+ // total = 11112
+
+ // BATCH 1 : 1, starting outerposition: 0
+ iter.next();
+ assertFalse(iter.finished());
+ assertEquals(1, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(0, iter.getOuterPosition());
+ assertEquals(1, iter.cachedBatches().size());
+ vectors = Lists.newArrayList();
+ for (VectorWrapper vw : iter) {
+ vectors.add(vw.getValueVector());
+ }
+ // mark at position 0
+ iter.mark();
+ checkValues(vectors, 0);
+
+ // BATCH 2: 100, starting outerposition: 1
+ iter.next();
+ assertFalse(iter.finished());
+ assertEquals(101, iter.getTotalRecordCount(), 101);
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(100, iter.getInnerRecordCount());
+ assertEquals(1, iter.getOuterPosition());
+ assertEquals(2, iter.cachedBatches().size());
+ for (int i = 0; i < 100; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 3 :10, starting outerposition: 101
+ assertFalse(iter.finished());
+ assertEquals(111, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(10, iter.getInnerRecordCount());
+ assertEquals(101, iter.getOuterPosition());
+ assertEquals(3, iter.cachedBatches().size());
+ for (int i = 0; i < 10; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 4 : 10000, starting outerposition: 111
+ assertFalse(iter.finished());
+ assertEquals(10111, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition(), 0);
+ assertEquals(10000, iter.getInnerRecordCount());
+ assertEquals(111, iter.getOuterPosition());
+ assertEquals(4, iter.cachedBatches().size());
+ for (int i = 0; i < 10000; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+
+ // BATCH 5 : 1, starting outerposition: 10111
+ assertFalse(iter.finished());
+ assertEquals(10112, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(1, iter.getInnerRecordCount());
+ assertEquals(10111, iter.getOuterPosition());
+ assertEquals(5, iter.cachedBatches().size());
+ checkValues(vectors, 0);
+ iter.next();
+
+ // BATCH 6 : 1000, starting outerposition: 10112
+ assertFalse(iter.finished());
+ assertEquals(11112, iter.getTotalRecordCount());
+ assertEquals(0, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ assertEquals(10112, iter.getOuterPosition());
+ assertEquals(6, iter.cachedBatches().size());
+ for (int i = 0; i < 1000; i++) {
+ checkValues(vectors, i);
+ iter.next();
+ }
+ assertTrue(iter.finished());
+ assertEquals(6, iter.cachedBatches().size());
+
+ // back to batch 1
+ iter.reset();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(6, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+ checkValues(vectors, 0);
+
+ iter.next();
+ // mark start of batch 2
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(5, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(100, iter.getInnerRecordCount());
+ for (int i = 0; i < 100; i++) {
+ iter.next();
+ }
+
+ // mark start of batch 3
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(4, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(10, iter.getInnerRecordCount());
+ for (int i = 0; i < 10; i++) {
+ iter.next();
+ }
+
+ // jump into middle of largest batch #4.
+ for (int i = 0; i < 5000; i++) {
+ iter.next();
+ }
+ assertEquals(4, iter.cachedBatches().size());
+ iter.mark();
+ assertEquals(3, iter.cachedBatches().size());
+ for (int i = 0; i < 5000; i++) {
+ iter.next();
+ }
+
+ // mark start of batch 5
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(2, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+
+ // move to last batch
+ iter.next();
+ // skip to the middle of last batch
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ checkValues(vectors, 499);
+ checkValues(vectors, 500);
+ iter.reset();
+ checkValues(vectors, 0);
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(2, iter.cachedBatches().size());
+ assertEquals(iter.getCurrentPosition(), 0);
+ assertEquals(1, iter.getInnerRecordCount());
+ // move to last batch
+ iter.next();
+ assertEquals(0, iter.getCurrentPosition());
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ // This should free 5th batch.
+ iter.mark();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(1, iter.cachedBatches().size());
+ assertEquals(500, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ // go to the end of iterator
+ for (int i = 0; i < 500; i++) {
+ iter.next();
+ }
+ assertTrue(iter.finished());
+ iter.reset();
+ assertFalse(iter.finished());
+ assertEquals(iter.getTotalRecordCount(), 11112);
+ assertEquals(1, iter.cachedBatches().size());
+ assertEquals(500, iter.getCurrentPosition());
+ assertEquals(1000, iter.getInnerRecordCount());
+ iter.close();
+ assertEquals(0, iter.cachedBatches().size());
+ }
+
+ private static boolean checkValues(List<ValueVector> vectors, int position) {
+ boolean result = true;
+ final int expected = (position % 2 == 0)? Integer.MIN_VALUE : Integer.MAX_VALUE;
+ for (ValueVector vv : vectors) {
+ final Object o = vv.getAccessor().getObject(position);
+ if (o instanceof Integer) {
+ final Integer v = (Integer)o;
+ result &= (v == expected);
+ } else {
+ System.out.println(String.format("Found wrong type %s at position %d", o.getClass(), position));
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/2f0e3f27/exec/java-exec/src/test/resources/record/test_recorditerator.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/record/test_recorditerator.json b/exec/java-exec/src/test/resources/record/test_recorditerator.json
new file mode 100644
index 0000000..057466b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/record/test_recorditerator.json
@@ -0,0 +1,71 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph: [
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {
+ records: 1,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 100,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 10,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 10000,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 1,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ },
+ {
+ records: 1000,
+ types: [
+ {name: "k", type: "INT", mode: "REQUIRED"},
+ {name: "v", type: "INT", mode: "REQUIRED"}
+ ]
+ }
+ ]
+ },
+ {
+ @id:2,
+ child:1,
+ pop:"project",
+ exprs:[ { ref : "`*`", expr : "`*`"} ]
+ },
+ {
+ @id:3,
+ child:2,
+ pop:"screen"
+ }
+ ]
+}