You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/15 22:05:38 UTC

[06/14] git commit: DRILL-707 : Replace ValueAllocator with allocateNewSafe() in SVR. WIP.

DRILL-707 : Replace ValueAllocator with allocateNewSafe() in SVR. WIP.

remove valueallocator in SVR.

SV for Limit OP.

Selection vector remover. More WIP.

code clean up.

reverse rule change.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c7746ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c7746ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c7746ed5

Branch: refs/heads/master
Commit: c7746ed518adcacd844c9c5e35dcefa52949a773
Parents: 850f8c6
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Tue May 13 17:20:56 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 15 09:20:11 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/config/Limit.java       |  8 +++++
 .../impl/svremover/CopierTemplate2.java         | 30 +++++++++----------
 .../impl/svremover/CopierTemplate4.java         | 26 ++++++++--------
 .../impl/svremover/RemovingRecordBatch.java     | 31 ++++++++++++--------
 .../org/apache/drill/TestExampleQueries.java    | 23 ++++++++++++++-
 .../exec/store/json/JsonRecordReader2Test.java  |  2 +-
 6 files changed, 76 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
index b926e3e..7d1d485 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Limit.java
@@ -20,11 +20,13 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractSingle;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 @JsonTypeName("limit")
 public class Limit extends AbstractSingle {
@@ -60,4 +62,10 @@ public class Limit extends AbstractSingle {
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
     return physicalVisitor.visitLimit(this, value);
   }
+
+  @Override
+  public SelectionVectorMode getSVMode() {
+    return SelectionVectorMode.TWO_BYTE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 2f589a5..387497c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -20,38 +20,38 @@ package org.apache.drill.exec.physical.impl.svremover;
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
+
 public abstract class CopierTemplate2 implements Copier{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
-  
+
   private SelectionVector2 sv2;
-  private VectorAllocator[] allocators;
   private RecordBatch incoming;
-  
+  private RecordBatch outgoing;
+
   @Override
   public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
-    this.allocators = allocators;
     this.sv2 = incoming.getSelectionVector2();
     this.incoming = incoming;
+    this.outgoing = outgoing;
     doSetup(context, incoming, outgoing);
   }
-  
-  private void allocateVectors(int recordCount){
-    for(VectorAllocator a : allocators){
-      a.alloc(recordCount);
-    }
-  }
-  
+
   @Override
   public int copyRecords(int index, int recordCount){
-    allocateVectors(recordCount);
+    for(VectorWrapper<?> out : outgoing){
+      out.getValueVector().allocateNewSafe();
+    }
+
     int outgoingPosition = 0;
-    
+
     for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
       if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) {
         break;
@@ -59,10 +59,10 @@ public abstract class CopierTemplate2 implements Copier{
     }
     return outgoingPosition;
   }
-  
+
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
-        
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index a7aba6e..b48a8fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -29,30 +29,28 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class CopierTemplate4 implements Copier{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
-  
+
   private SelectionVector4 sv4;
-  private VectorAllocator[] allocators;
   private RecordBatch incoming;
-  
-  private void allocateVectors(int recordCount){
-    for(VectorAllocator a : allocators){
-      a.alloc(recordCount);
-    }
-  }
-  
+  private RecordBatch outgoing;
+
+
   @Override
   public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
-    this.allocators = allocators;
     this.incoming = incoming;
+    this.outgoing = outgoing;
     this.sv4 = incoming.getSelectionVector4();
     doSetup(context, incoming, outgoing);
   }
-  
+
 
   @Override
   public int copyRecords(int index, int recordCount){
 //    logger.debug("Copying records.");
-    allocateVectors(recordCount);
+    for(VectorWrapper<?> out : outgoing){
+      out.getValueVector().allocateNewSafe();
+    }
+
     int outgoingPosition = 0;
     for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
       int deRefIndex = sv4.get(svIndex);
@@ -62,10 +60,10 @@ public abstract class CopierTemplate4 implements Copier{
     }
     return outgoingPosition;
   }
-  
+
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
-        
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 62af0b2..2918fd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -30,13 +30,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -126,11 +120,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
         }
       }
     }
+
+    logger.debug(String.format("doWork(): %s records copied for out of %s, remaining: %s, incoming schema %s ",
+        copiedRecords,
+        incoming.getRecordCount(),
+        incoming.getRecordCount() - remainderIndex,
+        incoming.getSchema()));
   }
 
   private void handleRemainder() {
     int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
-    int copiedRecords = copier.copyRecords(0, recordCount);
+    int copiedRecords = copier.copyRecords(0, remainingRecordCount);
+
     if (copiedRecords < remainingRecordCount) {
       for(VectorWrapper<?> v : container){
         ValueVector.Mutator m = v.getValueVector().getMutator();
@@ -142,6 +143,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       for(VectorWrapper<?> v : container){
         ValueVector.Mutator m = v.getValueVector().getMutator();
         m.setValueCount(remainingRecordCount);
+        this.recordCount = remainingRecordCount;
       }
       if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
         for(VectorWrapper<?> v: incoming) {
@@ -151,6 +153,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
       remainderIndex = 0;
       hasRemainder = false;
     }
+    logger.debug(String.format("handleRemainder(): %s records copied for out of %s, remaining: %s, incoming schema ",
+        copiedRecords,
+        incoming.getRecordCount(),
+        incoming.getRecordCount() - remainderIndex,
+        incoming.getSchema()));
   }
 
   public void cleanup(){
@@ -196,18 +203,17 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   private Copier getGenerated2Copier() throws SchemaChangeException{
     Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
 
-    List<VectorAllocator> allocators = Lists.newArrayList();
     for(VectorWrapper<?> i : incoming){
       ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
       container.add(v);
-      allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v));
     }
 
     try {
       final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
       generateCopies(cg.getRoot(), incoming, false);
       Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+      copier.setupRemover(context, incoming, this, null);
+
       return copier;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
@@ -221,19 +227,18 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
   public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
 
-    List<VectorAllocator> allocators = Lists.newArrayList();
     for(VectorWrapper<?> i : batch){
 
       ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
       container.add(v);
-      allocators.add(getAllocator4(v));
     }
 
     try {
       final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
       generateCopies(cg.getRoot(), batch, true);
       Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, batch, outgoing, allocators.toArray(new VectorAllocator[allocators.size()]));
+      copier.setupRemover(context, batch, outgoing, null);
+
       return copier;
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 99940f4..83b43fb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -50,7 +50,28 @@ public class TestExampleQueries extends BaseTestQuery{
 
   @Test
   public void testSelectWithLimit() throws Exception{
-    test("select employee_id,  first_name, last_name from cp.`employee.json` order by employee_id limit 5 offset 10");
+    test("select employee_id,  first_name, last_name from cp.`employee.json` limit 5 ");
+  }
+
+  @Test
+  public void testSelectWithLimit2() throws Exception{
+    test("select l_comment, l_orderkey from cp.`tpch/lineitem.parquet` limit 10000 ");
+  }
+
+  @Test
+  public void testSVRV4() throws Exception{
+    test("select employee_id,  first_name from cp.`employee.json` order by employee_id ");
+  }
+
+  @Test
+  public void testSVRV4MultBatch() throws Exception{
+    test("select l_orderkey from cp.`tpch/lineitem.parquet` order by l_orderkey limit 10000 ");
+  }
+
+  @Test
+  public void testSVRV4Join() throws Exception{
+    test("select count(*) from cp.`tpch/lineitem.parquet` l, cp.`tpch/partsupp.parquet` ps \n" +
+        " where l.l_partkey = ps.ps_partkey and l.l_suppkey = ps.ps_suppkey ;");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7746ed5/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
index 7a21cee..84195c3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JsonRecordReader2Test.java
@@ -28,7 +28,7 @@ public class JsonRecordReader2Test extends BaseTestQuery{
   @Test
   public void testComplexJsonInput() throws Exception{
 //  test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
-    test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink']  from cp.`jsoninput/input2.json` limit 10");
+    test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink']  from cp.`jsoninput/input2.json` ");
     test("select x from cp.`jsoninput/input2.json`");
 
 //    test("select z[0]  from cp.`jsoninput/input2.json` limit 10");