You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/06 23:46:31 UTC
[2/3] Sort operator using HyperBatch concept. Supports single and
multikey asc/desc sort. One comparator function definition. Add abstract
record batch,
vector container and vector wrapper. Various fixes to return to client when
query fails.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
new file mode 100644
index 0000000..daf96fc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.ArrayListMultimap;
+
+public class SortRecordBatchBuilder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
+
+ private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
+ private final VectorContainer container;
+
+ private int recordCount;
+ private long runningBytes;
+ private long runningBatches;
+ private final long maxBytes;
+ private SelectionVector4 sv4;
+ final PreAllocator svAllocator;
+
+ public SortRecordBatchBuilder(BufferAllocator a, long maxBytes, VectorContainer container){
+ this.maxBytes = maxBytes;
+ this.svAllocator = a.getPreAllocator();
+ this.container = container;
+ }
+
+ private long getSize(RecordBatch batch){
+ long bytes = 0;
+ for(VectorWrapper<?> v : batch){
+ bytes += v.getValueVector().getBufferSize();
+ }
+ return bytes;
+ }
+
+ /**
+ * Add another record batch to the set of record batches.
+ * @param batch
+ * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
+ * @throws SchemaChangeException
+ */
+ public boolean add(RecordBatch batch){
+ if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+
+ long batchBytes = getSize(batch);
+ if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
+ if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
+ if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+
+
+ RecordBatchData bd = new RecordBatchData(batch);
+ runningBytes += batchBytes;
+ batches.put(batch.getSchema(), bd);
+ recordCount += bd.getRecordCount();
+ return true;
+ }
+
+ public void build(FragmentContext context) throws SchemaChangeException{
+ container.clear();
+ if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
+ if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+ BatchSchema schema = batches.keySet().iterator().next();
+ List<RecordBatchData> data = batches.get(schema);
+
+ // now we're going to generate the sv4 pointers
+ switch(schema.getSelectionVectorMode()){
+ case NONE: {
+ int index = 0;
+ int recordBatchId = 0;
+ for(RecordBatchData d : data){
+ for(int i =0; i < d.getRecordCount(); i++, index++){
+ sv4.set(index, recordBatchId, i);
+ }
+ recordBatchId++;
+ }
+ break;
+ }
+ case TWO_BYTE: {
+ int index = 0;
+ int recordBatchId = 0;
+ for(RecordBatchData d : data){
+ for(int i =0; i < d.getRecordCount(); i++, index++){
+ sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+ }
+ // might as well drop the selection vector since we'll stop using it now.
+ d.getSv2().clear();
+ recordBatchId++;
+ }
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ // next, we'll create lists of each of the vector types.
+ ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
+ for(RecordBatchData rbd : batches.values()){
+ for(ValueVector v : rbd.vectors){
+ vectors.put(v.getField(), v);
+ }
+ }
+
+ for(MaterializedField f : vectors.keySet()){
+ List<ValueVector> v = vectors.get(f);
+ container.addHyperList(v);
+ }
+
+ container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ }
+
+ public SelectionVector4 getSv4() {
+ return sv4;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
new file mode 100644
index 0000000..c45f500
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -0,0 +1,45 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public abstract class SortTemplate implements Sorter, IndexedSortable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class);
+
+ private SelectionVector4 vector4;
+
+
+ public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException{
+ // we pass in the local hyperBatch since that is where we'll be reading data.
+ vector4 = hyperBatch.getSelectionVector4();
+ doSetup(context, hyperBatch, null);
+ }
+
+ @Override
+ public void sort(SelectionVector4 vector4, VectorContainer container){
+ QuickSort qs = new QuickSort();
+ qs.sort(this, 0, vector4.getTotalCount());
+ }
+
+ @Override
+ public void swap(int sv0, int sv1) {
+ int tmp = vector4.get(sv0);
+ vector4.set(sv0, vector4.get(sv1));
+ vector4.set(sv1, tmp);
+ }
+
+ @Override
+ public int compare(int inIndex, int outIndex) {
+ int sv1 = vector4.get(inIndex);
+ int sv2 = vector4.get(outIndex);
+ return doEval(sv1, sv2);
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract int doEval(int inIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
new file mode 100644
index 0000000..bc4fae5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public interface Sorter {
+ public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
+ public void sort(SelectionVector4 vector4, VectorContainer container);
+
+ public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
+ Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, int.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 55d6ba2..ce17a2b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -8,8 +8,11 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
public interface Copier {
- public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION = new TemplateClassDefinition<Copier>( //
- Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate", CopyEvaluator.class, null);
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
+ Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class, null);
+
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
+ Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class, null);
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
public abstract void copyRecords();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
deleted file mode 100644
index 6a0e2c3..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class CopierTemplate implements Copier{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate.class);
-
- private SelectionVector2 sv2;
- private VectorAllocator[] allocators;
-
- @Override
- public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
- this.allocators = allocators;
- this.sv2 = incoming.getSelectionVector2();
- doSetup(context, incoming, outgoing);
- }
-
- private void allocateVectors(int recordCount){
- for(VectorAllocator a : allocators){
- a.alloc(recordCount);
- }
- }
-
- @Override
- public void copyRecords(){
- final int recordCount = sv2.getCount();
- allocateVectors(recordCount);
- int outgoingPosition = 0;
-
- for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
- doEval(svIndex, outgoingPosition);
- }
- }
-
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doEval(int incoming, int outgoing);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
new file mode 100644
index 0000000..4dc38f2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class CopierTemplate2 implements Copier{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
+
+ private SelectionVector2 sv2;
+ private VectorAllocator[] allocators;
+
+ @Override
+ public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+ this.allocators = allocators;
+ this.sv2 = incoming.getSelectionVector2();
+ doSetup(context, incoming, outgoing);
+ }
+
+ private void allocateVectors(int recordCount){
+ for(VectorAllocator a : allocators){
+ a.alloc(recordCount);
+ }
+ }
+
+ @Override
+ public void copyRecords(){
+ final int recordCount = sv2.getCount();
+ allocateVectors(recordCount);
+ int outgoingPosition = 0;
+
+ for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
+ doEval(svIndex, outgoingPosition);
+ }
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract void doEval(int incoming, int outgoing);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
new file mode 100644
index 0000000..2cf033e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class CopierTemplate4 implements Copier{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
+
+ private SelectionVector4 sv4;
+ private VectorAllocator[] allocators;
+
+
+ private void allocateVectors(int recordCount){
+ for(VectorAllocator a : allocators){
+ a.alloc(recordCount);
+ }
+ }
+
+ @Override
+ public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+ this.allocators = allocators;
+ this.sv4 = incoming.getSelectionVector4();
+ doSetup(context, incoming, outgoing);
+ }
+
+
+ @Override
+ public void copyRecords(){
+ final int recordCount = sv4.getLength();
+ allocateVectors(recordCount);
+ int outgoingPosition = 0;
+ final int end = sv4.getStart() + sv4.getLength();
+ for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+ int deRefIndex = sv4.get(svIndex);
+ doEval(deRefIndex, outgoingPosition);
+ }
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract void doEval(int incoming, int outgoing);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 68793b0..64e89ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -1,24 +1,22 @@
package org.apache.drill.exec.physical.impl.svremover;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
@@ -31,36 +29,14 @@ import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
-public class RemovingRecordBatch implements RecordBatch{
+public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
- private final RecordBatch incoming;
- private final FragmentContext context;
- private BatchSchema outSchema;
private Copier copier;
- private List<ValueVector> outputVectors;
- private VectorHolder vh;
private int recordCount;
- public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
- this.incoming = incoming;
- this.context = context;
- }
-
- @Override
- public Iterator<ValueVector> iterator() {
- return outputVectors.iterator();
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
- public BatchSchema getSchema() {
- Preconditions.checkNotNull(outSchema);
- return outSchema;
+ public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) {
+ super(popConfig, context, incoming);
}
@Override
@@ -69,64 +45,36 @@ public class RemovingRecordBatch implements RecordBatch{
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return vh.getValueVector(path);
- }
+ protected void setupNewSchema() throws SchemaChangeException {
+ container.clear();
+
+ switch(incoming.getSchema().getSelectionVectorMode()){
+ case NONE:
+ this.copier = getStraightCopier();
+ break;
+ case TWO_BYTE:
+ this.copier = getGenerated2Copier();
+ break;
+ case FOUR_BYTE:
+ this.copier = getGenerated4Copier();
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ container.buildSchema(SelectionVectorMode.NONE);
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
- return vh.getValueVector(fieldId, clazz);
}
@Override
- public IterOutcome next() {
- recordCount = 0;
- IterOutcome upstream = incoming.next();
- logger.debug("Upstream... {}", upstream);
- switch(upstream){
- case NONE:
- case NOT_YET:
- case STOP:
- return upstream;
- case OK_NEW_SCHEMA:
- try{
- copier = createCopier();
- }catch(SchemaChangeException ex){
- incoming.kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- recordCount = incoming.getRecordCount();
- copier.copyRecords();
- for(ValueVector v : this.outputVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(recordCount);
- }
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ protected void doWork() {
+ recordCount = incoming.getRecordCount();
+ copier.copyRecords();
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(recordCount);
}
}
-
-
private class StraightCopier implements Copier{
@@ -136,8 +84,8 @@ public class RemovingRecordBatch implements RecordBatch{
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
- for(ValueVector vv : incoming){
- TransferPair tp = vv.getTransferPair();
+ for(VectorWrapper<?> vv : incoming){
+ TransferPair tp = vv.getValueVector().getTransferPair();
pairs.add(tp);
out.add(tp.getTo());
}
@@ -159,23 +107,23 @@ public class RemovingRecordBatch implements RecordBatch{
private Copier getStraightCopier(){
StraightCopier copier = new StraightCopier();
copier.setupRemover(context, incoming, this, null);
- outputVectors.addAll(copier.getOut());
+ container.addCollection(copier.getOut());
return copier;
}
- private Copier getGeneratedCopier() throws SchemaChangeException{
+ private Copier getGenerated2Copier() throws SchemaChangeException{
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
List<VectorAllocator> allocators = Lists.newArrayList();
- for(ValueVector i : incoming){
- TransferPair t = i.getTransferPair();
- outputVectors.add(t.getTo());
- allocators.add(getAllocator(i, t.getTo()));
+ for(VectorWrapper<?> i : incoming){
+ ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(getAllocator(i.getValueVector(), v));
}
try {
- final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- generateCopies(cg);
+ final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
+ generateCopies(cg, false);
Copier copier = context.getImplementationClass(cg);
copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
return copier;
@@ -184,73 +132,77 @@ public class RemovingRecordBatch implements RecordBatch{
}
}
-
- private Copier createCopier() throws SchemaChangeException{
- if(outputVectors != null){
- for(ValueVector v : outputVectors){
- v.close();
- }
- }
- this.outputVectors = Lists.newArrayList();
- this.vh = new VectorHolder(outputVectors);
-
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
- for(ValueVector v : incoming){
- bldr.addField(v.getField());
- }
- this.outSchema = bldr.build();
+ private Copier getGenerated4Copier() throws SchemaChangeException{
+ Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
- switch(incoming.getSchema().getSelectionVectorMode()){
- case NONE:
- return getStraightCopier();
- case TWO_BYTE:
- return getGeneratedCopier();
- default:
- throw new UnsupportedOperationException();
+ List<VectorAllocator> allocators = Lists.newArrayList();
+ for(VectorWrapper<?> i : incoming){
+
+ ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(getAllocator4(v));
}
+ try {
+ final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
+ generateCopies(cg, true);
+ Copier copier = context.getImplementationClass(cg);
+ copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+ return copier;
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
}
- private void generateCopies(CodeGenerator<Copier> g){
+ private void generateCopies(CodeGenerator<Copier> g, boolean hyper){
// we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
int fieldId = 0;
-
-
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
g.rotateBlock();
- for(ValueVector vv : incoming){
- JClass vvClass = (JClass) g.getModel()._ref(vv.getClass());
- JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
- JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
+ for(VectorWrapper<?> vv : incoming){
+ JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+ if(hyper){
+
+ g.getBlock().add(
+ outVV
+ .invoke("copyFrom")
+ .arg(
+ inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(
+ inVV.component(inIndex.shrz(JExpr.lit(16)))
+ )
+ );
+ }else{
+ g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
+ }
- g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
fieldId++;
}
}
- private JVar declareVVSetup(String varName, CodeGenerator<?> g, int fieldId, JClass vvClass){
- JVar vv = g.declareClassField("vv", vvClass);
- JClass t = (JClass) g.getModel()._ref(SchemaChangeException.class);
- JType objClass = g.getModel()._ref(Object.class);
- JBlock b = g.getSetupBlock();
- JVar obj = b.decl( //
- objClass, //
- g.getNextVar("tmp"), //
- JExpr.direct(varName).invoke("getValueVectorById").arg(JExpr.lit(fieldId)).arg( vvClass.dotclass()));
- b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id %d", vv.name(), fieldId))));
- b.assign(vv, JExpr.cast(vvClass, obj));
-
- return vv;
- }
-
+
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
+ private VectorAllocator getAllocator4(ValueVector outgoing){
+ if(outgoing instanceof FixedWidthVector){
+ return new FixedVectorAllocator((FixedWidthVector) outgoing);
+ }else if(outgoing instanceof VariableWidthVector ){
+ return new VariableEstimatedVector( (VariableWidthVector) outgoing, 50);
+ }else{
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
if(outgoing instanceof FixedWidthVector){
return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -273,11 +225,23 @@ public class RemovingRecordBatch implements RecordBatch{
out.allocateNew(recordCount);
out.getMutator().setValueCount(recordCount);
}
-
+ }
+
+ private class VariableEstimatedVector implements VectorAllocator{
+ VariableWidthVector out;
+ int avgWidth;
+ public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+ super();
+ this.out = out;
+ this.avgWidth = avgWidth;
+ }
+ public void alloc(int recordCount){
+ out.allocateNew(avgWidth * recordCount, recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
}
-
private class VariableVectorAllocator implements VectorAllocator{
VariableWidthVector in;
VariableWidthVector out;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 4671baa..88708e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -16,7 +16,7 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
@Override
public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
- return new RemovingRecordBatch(children.iterator().next(), context);
+ return new RemovingRecordBatch(config, context, children.iterator().next());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..a2584b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -0,0 +1,78 @@
+package org.apache.drill.exec.record;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+ protected final VectorContainer container = new VectorContainer();
+ protected final T popConfig;
+ protected final FragmentContext context;
+
+ protected AbstractRecordBatch(T popConfig, FragmentContext context) {
+ super();
+ this.context = context;
+ this.popConfig = popConfig;
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
+ @Override
+ public void kill() {
+ container.clear();
+ killIncoming();
+ cleanup();
+ }
+
+ protected abstract void killIncoming();
+
+ protected void cleanup(){
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVector(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ return container.getVectorAccessor(fieldId, clazz);
+ }
+
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(this);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
new file mode 100644
index 0000000..10cc7ab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingleRecordBatch.class);
+
+ protected final RecordBatch incoming;
+
+ public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) {
+ super(popConfig, context);
+ this.incoming = incoming;
+ }
+
+ @Override
+ protected void killIncoming() {
+ incoming.kill();
+ }
+
+ @Override
+ public IterOutcome next() {
+ IterOutcome upstream = incoming.next();
+
+ switch(upstream){
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ container.clear();
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try{
+ setupNewSchema();
+ }catch(SchemaChangeException ex){
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ doWork();
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected abstract void setupNewSchema() throws SchemaChangeException;
+ protected abstract void doWork();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
new file mode 100644
index 0000000..e8a5cf8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -0,0 +1,54 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
+
+ private T[] vectors;
+ private MaterializedField f;
+
+ public HyperVectorWrapper(MaterializedField f, T[] v){
+ assert(v.length > 0);
+ this.f = f;
+ this.vectors = v;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<T> getVectorClass() {
+ return (Class<T>) vectors.getClass().getComponentType();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return f;
+ }
+
+ @Override
+ public T getValueVector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T[] getValueVectors() {
+ return vectors;
+ }
+
+ @Override
+ public boolean isHyper() {
+ return true;
+ }
+
+ @Override
+ public void release() {
+ for(T x : vectors){
+ x.clear();
+ }
+
+ }
+
+ public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
+ return new HyperVectorWrapper<T>(f, v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
deleted file mode 100644
index d820e0e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-
-public class InvalidValueAccessor extends ExecutionSetupException{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);
-
- public InvalidValueAccessor() {
- super();
- }
-
- public InvalidValueAccessor(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public InvalidValueAccessor(String message, Throwable cause) {
- super(message, cause);
- }
-
- public InvalidValueAccessor(String message) {
- super(message);
- }
-
- public InvalidValueAccessor(Throwable cause) {
- super(cause);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fc4e759..f747968 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -32,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
* A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
* provided utilizing getValueVectorId();
*/
-public interface RecordBatch extends Iterable<ValueVector> {
+public interface RecordBatch extends Iterable<VectorWrapper<?>> {
/**
* Describes the outcome of a RecordBatch being incremented forward.
@@ -93,8 +92,7 @@ public interface RecordBatch extends Iterable<ValueVector> {
* TypedFieldId
*/
public abstract TypedFieldId getValueVectorId(SchemaPath path);
-
- public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
+ public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
/**
* Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -112,43 +110,4 @@ public interface RecordBatch extends Iterable<ValueVector> {
*/
public WritableBatch getWritableBatch();
- public static class TypedFieldId {
- final MajorType type;
- final int fieldId;
-
- public TypedFieldId(MajorType type, int fieldId) {
- super();
- this.type = type;
- this.fieldId = fieldId;
- }
-
- public MajorType getType() {
- return type;
- }
-
- public int getFieldId() {
- return fieldId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TypedFieldId other = (TypedFieldId) obj;
- if (fieldId != other.fieldId)
- return false;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- return true;
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ebacd6e..593c28c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -29,18 +29,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-public class RecordBatchLoader implements Iterable<ValueVector>{
+public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
- private List<ValueVector> vectors = Lists.newArrayList();
+ private VectorContainer container = new VectorContainer();
private final BufferAllocator allocator;
private int recordCount;
private BatchSchema schema;
@@ -66,11 +65,12 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
boolean schemaChanged = false;
Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
- for(ValueVector v : this.vectors){
+ for(VectorWrapper<?> w : container){
+ ValueVector v = w.getValueVector();
oldFields.put(v.getField(), v);
}
- List<ValueVector> newVectors = Lists.newArrayList();
+ VectorContainer newVectors = new VectorContainer();
List<FieldMetadata> fields = def.getFieldList();
@@ -79,7 +79,7 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
FieldDef fieldDef = fmd.getDef();
ValueVector v = oldFields.remove(fieldDef);
if(v != null){
- newVectors.add(v);
+ container.add(v);
continue;
}
@@ -101,49 +101,51 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
// rebuild the schema.
SchemaBuilder b = BatchSchema.newBuilder();
- for(ValueVector v : newVectors){
+ for(VectorWrapper<?> v : newVectors){
b.addField(v.getField());
}
b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
this.schema = b.build();
- vectors = ImmutableList.copyOf(newVectors);
+ container = newVectors;
return schemaChanged;
}
- public TypedFieldId getValueVector(SchemaPath path) {
- for(int i =0; i < vectors.size(); i++){
- ValueVector vv = vectors.get(i);
- if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i);
- }
- return null;
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVector(path);
}
- @SuppressWarnings("unchecked")
- public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
- ValueVector v = vectors.get(fieldId);
- assert v != null;
- if (v.getClass() != clazz){
- logger.warn(String.format(
- "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
- clazz.getCanonicalName(), v.getClass().getCanonicalName()));
- return null;
- }
- return (T) v;
- }
+
+
+//
+// @SuppressWarnings("unchecked")
+// public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
+// ValueVector v = container.get(fieldId);
+// assert v != null;
+// if (v.getClass() != clazz){
+// logger.warn(String.format(
+// "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+// clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+// return null;
+// }
+// return (T) v;
+// }
public int getRecordCount() {
return recordCount;
}
-
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+ return container.getVectorAccessor(fieldId, clazz);
+ }
+
public WritableBatch getWritableBatch(){
- return WritableBatch.getBatchNoSV(recordCount, vectors);
+ return WritableBatch.getBatchNoSVWrap(recordCount, container);
}
@Override
- public Iterator<ValueVector> iterator() {
- return this.vectors.iterator();
+ public Iterator<VectorWrapper<?>> iterator() {
+ return this.container.iterator();
}
public BatchSchema getSchema(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
deleted file mode 100644
index 9bc6e5f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-public class RecordMaker {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordMaker.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
deleted file mode 100644
index 86c963d..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.drill.exec.record;
-
-/**
- * Remove the selection vector from a record batch.
- */
-public class RecordRemapper {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRemapper.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
new file mode 100644
index 0000000..94700a2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -0,0 +1,49 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
+
+ private T v;
+
+ public SimpleVectorWrapper(T v){
+ this.v = v;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<T> getVectorClass() {
+ return (Class<T>) v.getClass();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return v.getField();
+ }
+
+ @Override
+ public T getValueVector() {
+ return v;
+ }
+
+ @Override
+ public T[] getValueVectors() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isHyper() {
+ return false;
+ }
+
+
+ @Override
+ public void release() {
+ v.clear();
+ }
+
+ public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
+ return new SimpleVectorWrapper<T>(v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
new file mode 100644
index 0000000..3905fa8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class TypedFieldId {
+ final MajorType type;
+ final int fieldId;
+ final boolean isHyperReader;
+
+ public TypedFieldId(MajorType type, int fieldId){
+ this(type, fieldId, false);
+ }
+
+ public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+ super();
+ this.type = type;
+ this.fieldId = fieldId;
+ this.isHyperReader = isHyper;
+ }
+
+ public boolean isHyperReader(){
+ return isHyperReader;
+ }
+
+ public MajorType getType() {
+ return type;
+ }
+
+ public int getFieldId() {
+ return fieldId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TypedFieldId other = (TypedFieldId) obj;
+ if (fieldId != other.fieldId)
+ return false;
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
new file mode 100644
index 0000000..923fbd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.record;
+
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+
+public class VectorContainer implements Iterable<VectorWrapper<?>> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
+
+ private final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
+ private BatchSchema schema;
+
+ public VectorContainer() {
+ }
+
+ public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
+ assert !vectors.isEmpty() || !hyperVectors.isEmpty();
+
+ addCollection(vectors);
+
+ for (ValueVector[] vArr : hyperVectors) {
+ add(vArr);
+ }
+ }
+
+ public void addHyperList(List<ValueVector> vectors){
+ schema = null;
+ ValueVector[] vv = new ValueVector[vectors.size()];
+ for(int i =0; i < vv.length; i++){
+ vv[i] = vectors.get(i);
+ }
+ add(vv);
+ }
+
+ public void addCollection(Iterable<ValueVector> vectors) {
+ schema = null;
+ for (ValueVector vv : vectors) {
+ wrappers.add(SimpleVectorWrapper.create(vv));
+ }
+ }
+
+ public TypedFieldId add(ValueVector vv) {
+ schema = null;
+ int i = wrappers.size();
+ wrappers.add(SimpleVectorWrapper.create(vv));
+ return new TypedFieldId(vv.getField().getType(), i, false);
+ }
+
+ public void add(ValueVector[] hyperVector) {
+ assert hyperVector.length != 0;
+ schema = null;
+ Class<?> clazz = hyperVector[0].getClass();
+ ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
+ for (int i = 0; i < hyperVector.length; i++) {
+ c[i] = hyperVector[i];
+ }
+ // todo: work with a merged schema.
+ wrappers.add(HyperVectorWrapper.create(hyperVector[0].getField(), c));
+ }
+
+ public void remove(ValueVector v) {
+ schema = null;
+ for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
+ VectorWrapper<?> w = iter.next();
+ if (!w.isHyper() && v == w.getValueVector()) {
+ iter.remove();
+ return;
+ }
+ }
+
+ throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
+ }
+
+ public TypedFieldId getValueVector(SchemaPath path) {
+ for (int i = 0; i < wrappers.size(); i++) {
+ VectorWrapper<?> va = wrappers.get(i);
+ if (va.getField().matches(path))
+ return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends ValueVector> VectorWrapper<T> getVectorAccessor(int fieldId, Class<?> clazz) {
+ VectorWrapper<?> va = wrappers.get(fieldId);
+ assert va != null;
+ if (va.getVectorClass() != clazz) {
+ logger.warn(String.format(
+ "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+ clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+ return null;
+ }
+ return (VectorWrapper<T>) va;
+ }
+
+ public BatchSchema getSchema(){
+ Preconditions.checkNotNull(schema, "Schema is currently null. You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+ return schema;
+ }
+
+ public void buildSchema(SelectionVectorMode mode) {
+ SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
+ for (VectorWrapper<?> v : wrappers) {
+ bldr.addField(v.getField());
+ }
+ this.schema = bldr.build();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return wrappers.iterator();
+ }
+
+ public void clear() {
+ // TODO: figure out a better approach for this.
+ // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+ // schema = null;
+ for (VectorWrapper<?> w : wrappers) {
+ w.release();
+ }
+ wrappers.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
new file mode 100644
index 0000000..e40dee4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -0,0 +1,14 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+
+public interface VectorWrapper<T extends ValueVector> {
+
+ public Class<T> getVectorClass();
+ public MaterializedField getField();
+ public T getValueVector();
+ public T[] getValueVectors();
+ public boolean isHyper();
+ public void release();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 685cc77..e84bf37 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -57,6 +58,15 @@ public class WritableBatch {
return buffers;
}
+ public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+ List<ValueVector> vectors = Lists.newArrayList();
+ for(VectorWrapper<?> vw : vws){
+ Preconditions.checkArgument(!vw.isHyper());
+ vectors.add(vw.getValueVector());
+ }
+ return getBatchNoSV(recordCount, vectors);
+ }
+
public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
@@ -83,7 +93,7 @@ public class WritableBatch {
public static WritableBatch get(RecordBatch batch) {
if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
- return getBatchNoSV(batch.getRecordCount(), batch);
+ return getBatchNoSVWrap(batch.getRecordCount(), batch);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 1f3874f..2020f92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -20,22 +20,58 @@ package org.apache.drill.exec.record.selection;
import io.netty.buffer.ByteBuf;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.exception.SchemaChangeException;
public class SelectionVector4 {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
- private final BufferAllocator allocator;
- private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
-
- public SelectionVector4(BufferAllocator allocator) {
- this.allocator = allocator;
+ private final ByteBuf vector;
+ private final int recordCount;
+ private int start;
+ private int length;
+
+ public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+ if(recordCount > Integer.MAX_VALUE /4) throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. You requested an allocation of %d bytes.", recordCount * 4));
+ this.recordCount = recordCount;
+ this.start = 0;
+ this.length = Math.min(batchRecordCount, recordCount);
+ this.vector = vector;
}
-
- public int getCount(){
- return -1;
+
+ public int getTotalCount(){
+ return recordCount;
+ }
+
+ public int getCurrentCount(){
+ return length;
+ }
+
+ public void set(int index, int compound){
+ vector.setInt(index*4, compound);
+ }
+ public void set(int index, int recordBatch, int recordIndex){
+ vector.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535));
+ }
+
+ public int get(int index){
+ return vector.getInt(index*4);
}
+ public int getStart() {
+ return start;
+ }
+ public int getLength() {
+ return length;
+ }
+
+ public boolean next(){
+ if(start + length == recordCount) return false;
+ start = start+length;
+ int newEnd = Math.min(start+length, recordCount);
+ length = newEnd - start;
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
new file mode 100644
index 0000000..722f4d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.record.selection;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.collect.Lists;
+
+public class SelectionVector4Builder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4Builder.class);
+
+ private List<BatchSchema> schemas = Lists.newArrayList();
+
+ public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException{
+ if(!schemas.isEmpty() && newSchema) throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types");
+ if(newSchema){
+ schemas.add(batch.getSchema());
+ }
+
+ }
+
+
+ // deals with managing selection vectors.
+ // take a four byte int
+ /**
+ * take a four byte value
+ * use the first two as a pointer. use the other two as a
+ *
+ * we should manage an array of valuevectors
+ */
+
+ private class VectorSchemaBuilder{
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 36fd199..b2283a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
@@ -54,29 +55,40 @@ public class QueryResultHandler {
final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
final QueryResultBatch batch = new QueryResultBatch(result, dBody);
UserResultsListener l = resultsListener.get(result.getQueryId());
+
+ boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED;
// logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
- if (l != null) {
- // logger.debug("Results listener available, using existing.");
- l.resultArrived(batch);
- if (result.getIsLastChunk()) {
- resultsListener.remove(result.getQueryId(), l);
- }
- } else {
- logger.debug("Results listener not available, creating a buffering listener.");
- // manage race condition where we start getting results before we receive the queryid back.
+ if (l == null) {
BufferingListener bl = new BufferingListener();
l = resultsListener.putIfAbsent(result.getQueryId(), bl);
- if (l != null) {
- l.resultArrived(batch);
- } else {
- bl.resultArrived(batch);
- }
+ // if we had a succesful insert, use that reference. Otherwise, just throw away the new bufering listener.
+ if (l == null) l = bl;
}
+
+ if(failed){
+ l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
+ resultsListener.remove(result.getQueryId(), l);
+ }else{
+ l.resultArrived(batch);
+ }
+
+ if (
+ (failed || result.getIsLastChunk())
+ &&
+ (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
+ ) {
+ resultsListener.remove(result.getQueryId(), l);
+ }
+
}
+
+
private class BufferingListener implements UserResultsListener {
private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private volatile boolean finished = false;
+ private volatile RpcException ex;
private volatile UserResultsListener output;
public boolean transferTo(UserResultsListener l) {
@@ -87,12 +99,19 @@ public class QueryResultHandler {
l.resultArrived(r);
last = r.getHeader().getIsLastChunk();
}
+ if(ex != null){
+ l.submissionFailed(ex);
+ return true;
+ }
return last;
}
}
+
@Override
public void resultArrived(QueryResultBatch result) {
+ if(result.getHeader().getIsLastChunk()) finished = true;
+
synchronized (this) {
if (output == null) {
this.results.add(result);
@@ -104,7 +123,18 @@ public class QueryResultHandler {
@Override
public void submissionFailed(RpcException ex) {
- throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+ finished = true;
+ synchronized (this) {
+ if (output == null){
+ this.ex = ex;
+ } else{
+ output.submissionFailed(ex);
+ }
+ }
+ }
+
+ public boolean isFinished(){
+ return finished;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ad54a07..1b1e39a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -28,6 +28,7 @@ abstract class BaseDataValueVector extends BaseValueVector{
valueCount = 0;
}
}
+
@Override
public ByteBuf[] getBuffers(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
new file mode 100644
index 0000000..d43c38e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
@@ -0,0 +1,7 @@
+package org.apache.drill.exec.vector;
+
+
+public interface SerializableVector extends ValueVector{
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 717c087..dfe8e8c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.record.TransferPair;
*/
public interface ValueVector extends Closeable {
+ public int getBufferSize();
+
/**
* Alternative to clear(). Allows use as closeable in try-with-resources.
*/
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 7668bdc..9fd33b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
import org.apache.drill.exec.work.foreman.ErrorHelper;
-public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+public abstract class AbstractFragmentRunnerListener implements FragmentRunnerListener{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
private FragmentContext context;
@@ -90,9 +90,9 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
statusChange(handle, statusBuilder.build());
}
- protected void statusChange(FragmentHandle handle, FragmentStatus status){
+ protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
- }
+
@Override
public final void fail(FragmentHandle handle, String message, Throwable excep) {
@@ -103,7 +103,6 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
statusChange(handle, statusBuilder.build());
- // TODO: ensure the foreman handles the exception
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
index 74fcd2b..ef7bcb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -38,8 +38,10 @@ public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListen
this.tunnel = tunnel;
}
+
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ logger.debug("Sending remote failure.");
tunnel.sendFragmentStatus(status);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index d4c4014..9a33109 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -31,11 +31,24 @@ public class ErrorHelper {
DrillPBError.Builder builder = DrillPBError.newBuilder();
builder.setEndpoint(endpoint);
builder.setErrorId(id);
+ StringBuilder sb = new StringBuilder();
if(message != null){
- builder.setMessage(message);
- }else{
- builder.setMessage(t.getMessage());
+ sb.append(message);
}
+
+ do{
+ sb.append(" < ");
+ sb.append(t.getClass().getSimpleName());
+ if(t.getMessage() != null){
+ sb.append(":[ ");
+ sb.append(t.getMessage());
+ sb.append(" ]");
+ }
+ }while(t.getCause() != null && t.getCause() != t);
+
+ builder.setMessage(sb.toString());
+
+
builder.setErrorType(0);
// record the error to the log for later reference.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 65fc8c7..af91a6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.rpc.RpcException;
@@ -61,6 +62,7 @@ class RunningFragmentManager implements FragmentStatusListener{
private ForemanManagerListener foreman;
private AtomicInteger remainingFragmentCount;
private FragmentRunner rootRunner;
+ private volatile QueryId queryId;
public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
super();
@@ -72,6 +74,7 @@ class RunningFragmentManager implements FragmentStatusListener{
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
remainingFragmentCount.set(leafFragments.size()+1);
+ queryId = rootFragment.getHandle().getQueryId();
// set up the root fragment first so we'll have incoming buffers available.
{
@@ -146,7 +149,7 @@ class RunningFragmentManager implements FragmentStatusListener{
private void fail(FragmentStatus status){
updateStatus(status);
stopQuery();
- QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+ QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
foreman.cleanupAndSendResult(result);
}
@@ -262,7 +265,7 @@ class RunningFragmentManager implements FragmentStatusListener{
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- RunningFragmentManager.this.updateStatus(status);
+ RunningFragmentManager.this.statusUpdate(status);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
new file mode 100644
index 0000000..a41a883
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.expr;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+
+public final class EscapeTest1 {
+
+ public static class Timer{
+ long n1;
+ String name;
+ public Timer(String name){
+ this.n1 = System.nanoTime();
+ this.name = name;
+ }
+
+ public void print(long sum){
+ System.out.println(String.format("Completed %s in %d ms. Output was %d", name, (System.nanoTime() - n1)/1000/1000, sum));
+ }
+ }
+
+ public static Timer time(String name){
+ return new Timer(name);
+ }
+
+ public static void main(String args[]){
+ EscapeTest1 et = new EscapeTest1();
+ Monkey m = new Monkey();
+ for(int i =0; i < 10; i++){
+ time("noalloc").print(et.noAlloc());
+ time("alloc").print(et.alloc());
+ time("set noalloc").print(et.setNoAlloc(m));
+ time("set alloc").print(et.setAlloc(m));
+ time("get noalloc").print(et.getNoAlloc(m));
+ time("get alloc").print(et.getAlloc(m));
+ time("get return alloc").print(et.getReturnAlloc(m));
+ }
+ }
+
+ public long noAlloc(){
+ long sum = 0;
+ for(int i =0; i < 1000000000; i++){
+ sum+= add(i+1, i+2);
+ }
+ return sum;
+ }
+
+ public long alloc(){
+ long sum = 0;
+ for(int i =0; i < 1000000000; i++){
+ Ad ad = new Ad(i+1, i+2);
+ sum+= add(ad.x, ad.y);
+ }
+ return sum;
+ }
+
+ public long setAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ EH h = new EH(i+1, i+2);
+ m.set(h);
+ sum += i;
+ }
+ return sum;
+ }
+
+ public long setNoAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ m.set(i+1, i+2);
+ sum += i;
+ }
+ return sum;
+ }
+
+ public long getAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ RR r = new RR();
+ m.get(i, i+1, r);
+ sum += r.v1 + r.v2;
+ }
+ return sum;
+ }
+
+ public long getNoAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ int i1 = m.getV1(i);
+ int i2 = m.getV2(i+1);
+ sum += i1 + i2;
+ }
+ return sum;
+ }
+
+ public long getReturnAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ RR r = m.get(i, i+1);
+ sum += r.v1 + r.v2;
+ }
+ return sum;
+ }
+
+
+ public class Ad{
+ long x;
+ long y;
+ public Ad(long x, long y) {
+ super();
+ this.x = x;
+ this.y = y;
+ }
+ }
+
+
+ public static final class EH{
+ int index;
+ int value;
+ public EH(int index, int value) {
+ super();
+ this.index = index;
+ this.value = value;
+ }
+ }
+
+ public static final class RR{
+ int v1;
+ int v2;
+
+ public RR(){
+
+ }
+ public RR(int v1, int v2) {
+ super();
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+ }
+
+ public long add(long a, long b){
+ return a + b;
+ }
+
+
+ public final static class Monkey{
+ final IntBuffer buf;
+
+ public Monkey(){
+ ByteBuffer bb = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
+ buf = bb.asIntBuffer();
+ }
+
+ public final void set(int index, int value){
+ buf.put(index, value);
+ }
+
+ public final void set(EH a){
+ buf.put(a.index, a.value);
+ }
+
+ public final int getV1(int index){
+ return buf.get(index);
+ }
+
+ public final int getV2(int index){
+ return buf.get(index);
+ }
+
+ public final RR get(int index1, int index2){
+ return new RR(buf.get(index1), buf.get(index2));
+ }
+
+ public final void get(int index1, int index2, RR rr){
+ rr.v1 = buf.get(index1);
+ rr.v2 = buf.get(index2);
+ }
+
+ }
+
+
+}