You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/15 22:05:38 UTC
[06/14] git commit: DRILL-707 : Replace ValueAllocator with
allocateNewSafe() in SVR. WIP.
DRILL-707 : Replace ValueAllocator with allocateNewSafe() in SVR. WIP.
remove valueallocator in SVR.
SV for Limit OP.
Selection vector remover. More WIP.
code clean up.
reverse rule change.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c7746ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c7746ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c7746ed5
Branch: refs/heads/master
Commit: c7746ed518adcacd844c9c5e35dcefa52949a773
Parents: 850f8c6
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Tue May 13 17:20:56 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 15 09:20:11 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/config/Limit.java | 8 +++++
.../impl/svremover/CopierTemplate2.java | 30 +++++++++----------
.../impl/svremover/CopierTemplate4.java | 26 ++++++++--------
.../impl/svremover/RemovingRecordBatch.java | 31 ++++++++++++--------
.../org/apache/drill/TestExampleQueries.java | 23 ++++++++++++++-
.../exec/store/json/JsonRecordReader2Test.java | 2 +-
6 files changed, 76 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
index b926e3e..7d1d485 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
@@ -20,11 +20,13 @@ package org.apache.drill.exec.physical.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@JsonTypeName("limit")
public class Limit extends AbstractSingle {
@@ -60,4 +62,10 @@ public class Limit extends AbstractSingle {
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
return physicalVisitor.visitLimit(this, value);
}
+
+ @Override
+ public SelectionVectorMode getSVMode() {
+ return SelectionVectorMode.TWO_BYTE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 2f589a5..387497c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -20,38 +20,38 @@ package org.apache.drill.exec.physical.impl.svremover;
import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
public abstract class CopierTemplate2 implements Copier{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
-
+
private SelectionVector2 sv2;
- private VectorAllocator[] allocators;
private RecordBatch incoming;
-
+ private RecordBatch outgoing;
+
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
- this.allocators = allocators;
this.sv2 = incoming.getSelectionVector2();
this.incoming = incoming;
+ this.outgoing = outgoing;
doSetup(context, incoming, outgoing);
}
-
- private void allocateVectors(int recordCount){
- for(VectorAllocator a : allocators){
- a.alloc(recordCount);
- }
- }
-
+
@Override
public int copyRecords(int index, int recordCount){
- allocateVectors(recordCount);
+ for(VectorWrapper<?> out : outgoing){
+ out.getValueVector().allocateNewSafe();
+ }
+
int outgoingPosition = 0;
-
+
for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) {
break;
@@ -59,10 +59,10 @@ public abstract class CopierTemplate2 implements Copier{
}
return outgoingPosition;
}
-
+
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index a7aba6e..b48a8fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -29,30 +29,28 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
public abstract class CopierTemplate4 implements Copier{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
-
+
private SelectionVector4 sv4;
- private VectorAllocator[] allocators;
private RecordBatch incoming;
-
- private void allocateVectors(int recordCount){
- for(VectorAllocator a : allocators){
- a.alloc(recordCount);
- }
- }
-
+ private RecordBatch outgoing;
+
+
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
- this.allocators = allocators;
this.incoming = incoming;
+ this.outgoing = outgoing;
this.sv4 = incoming.getSelectionVector4();
doSetup(context, incoming, outgoing);
}
-
+
@Override
public int copyRecords(int index, int recordCount){
// logger.debug("Copying records.");
- allocateVectors(recordCount);
+ for(VectorWrapper<?> out : outgoing){
+ out.getValueVector().allocateNewSafe();
+ }
+
int outgoingPosition = 0;
for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
int deRefIndex = sv4.get(svIndex);
@@ -62,10 +60,10 @@ public abstract class CopierTemplate4 implements Copier{
}
return outgoingPosition;
}
-
+
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 62af0b2..2918fd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -30,13 +30,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -126,11 +120,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
}
}
+
+ logger.debug(String.format("doWork(): %s records copied for out of %s, remaining: %s, incoming schema %s ",
+ copiedRecords,
+ incoming.getRecordCount(),
+ incoming.getRecordCount() - remainderIndex,
+ incoming.getSchema()));
}
private void handleRemainder() {
int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
- int copiedRecords = copier.copyRecords(0, recordCount);
+ int copiedRecords = copier.copyRecords(0, remainingRecordCount);
+
if (copiedRecords < remainingRecordCount) {
for(VectorWrapper<?> v : container){
ValueVector.Mutator m = v.getValueVector().getMutator();
@@ -142,6 +143,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
for(VectorWrapper<?> v : container){
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(remainingRecordCount);
+ this.recordCount = remainingRecordCount;
}
if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
for(VectorWrapper<?> v: incoming) {
@@ -151,6 +153,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
remainderIndex = 0;
hasRemainder = false;
}
+ logger.debug(String.format("handleRemainder(): %s records copied for out of %s, remaining: %s, incoming schema ",
+ copiedRecords,
+ incoming.getRecordCount(),
+ incoming.getRecordCount() - remainderIndex,
+ incoming.getSchema()));
}
public void cleanup(){
@@ -196,18 +203,17 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
private Copier getGenerated2Copier() throws SchemaChangeException{
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
- List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : incoming){
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
container.add(v);
- allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v));
}
try {
final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
generateCopies(cg.getRoot(), incoming, false);
Copier copier = context.getImplementationClass(cg);
- copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+ copier.setupRemover(context, incoming, this, null);
+
return copier;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
@@ -221,19 +227,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
- List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
container.add(v);
- allocators.add(getAllocator4(v));
}
try {
final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
generateCopies(cg.getRoot(), batch, true);
Copier copier = context.getImplementationClass(cg);
- copier.setupRemover(context, batch, outgoing, allocators.toArray(new VectorAllocator[allocators.size()]));
+ copier.setupRemover(context, batch, outgoing, null);
+
return copier;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 99940f4..83b43fb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -50,7 +50,28 @@ public class TestExampleQueries extends BaseTestQuery{
@Test
public void testSelectWithLimit() throws Exception{
- test("select employee_id, first_name, last_name from cp.`employee.json` order by employee_id limit 5 offset 10");
+ test("select employee_id, first_name, last_name from cp.`employee.json` limit 5 ");
+ }
+
+ @Test
+ public void testSelectWithLimit2() throws Exception{
+ test("select l_comment, l_orderkey from cp.`tpch/lineitem.parquet` limit 10000 ");
+ }
+
+ @Test
+ public void testSVRV4() throws Exception{
+ test("select employee_id, first_name from cp.`employee.json` order by employee_id ");
+ }
+
+ @Test
+ public void testSVRV4MultBatch() throws Exception{
+ test("select l_orderkey from cp.`tpch/lineitem.parquet` order by l_orderkey limit 10000 ");
+ }
+
+ @Test
+ public void testSVRV4Join() throws Exception{
+ test("select count(*) from cp.`tpch/lineitem.parquet` l, cp.`tpch/partsupp.parquet` ps \n" +
+ " where l.l_partkey = ps.ps_partkey and l.l_suppkey = ps.ps_suppkey ;");
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
index 7a21cee..84195c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
@@ -28,7 +28,7 @@ public class JsonRecordReader2Test extends BaseTestQuery{
@Test
public void testComplexJsonInput() throws Exception{
// test("select z[0]['orange'] from cp.`jsoninput/input2.json` limit 10");
- test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` limit 10");
+ test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` ");
test("select x from cp.`jsoninput/input2.json`");
// test("select z[0] from cp.`jsoninput/input2.json` limit 10");