You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/28 02:30:38 UTC
[3/5] DRILL-185: Extend code/clean code generator and add first
aggregate functions.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
new file mode 100644
index 0000000..25e3675
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggTemplate.java
@@ -0,0 +1,288 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.AggBatch.AggOutcome;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public abstract class AggTemplate implements Aggregator {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Aggregator.class);
+ private static final boolean EXTRA_DEBUG = false;
+ private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
+ private IterOutcome lastOutcome = null;
+ private boolean first = true;
+ private boolean newSchema = false;
+ private int previousIndex = 0;
+ private int underlyingIndex = 0;
+ private int currentIndex;
+ private int addedRecordCount = 0;
+ private boolean pendingOutput = false;
+ private IterOutcome outcome;
+ private int outputCount = 0;
+ private RecordBatch incoming;
+ private BatchSchema schema;
+ private RecordBatch outgoing;
+ private VectorAllocator[] allocators;
+ private FragmentContext context;
+ private InternalBatch remainderBatch;
+
+
+ @Override
+ public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException {
+ this.allocators = allocators;
+ this.context = context;
+ this.incoming = incoming;
+ this.schema = incoming.getSchema();
+ this.allocators = allocators;
+ this.outgoing = outgoing;
+ setupInterior(incoming, outgoing);
+ this.currentIndex = this.getVectorIndex(underlyingIndex);
+ }
+
+
+ private void allocateOutgoing() {
+ for (VectorAllocator a : allocators) {
+ if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000);
+ a.alloc(20000);
+ }
+ }
+
+ @Override
+ public IterOutcome getOutcome() {
+ return outcome;
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputCount;
+ }
+
+ private AggOutcome tooBigFailure(){
+ context.fail(new Exception(TOO_BIG_ERROR));
+ this.outcome = IterOutcome.STOP;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
+
+ @Override
+ public AggOutcome doWork() {
+ try{ // outside loop to ensure that first is set to false after the first run.
+
+ // if we're in the first state, allocate outgoing.
+ if(first){
+ allocateOutgoing();
+ }
+
+ // pick up a remainder batch if we have one.
+ if(remainderBatch != null){
+ if (!outputToBatch( previousIndex )) return tooBigFailure();
+ remainderBatch.clear();
+ remainderBatch = null;
+ return setOkAndReturn();
+ }
+
+
+ // setup for new output and pick any remainder.
+ if (pendingOutput) {
+ allocateOutgoing();
+ pendingOutput = false;
+ if (!outputToBatch( previousIndex)) return tooBigFailure();
+ }
+
+ if(newSchema){
+ return AggOutcome.UPDATE_AGGREGATOR;
+ }
+
+ if(lastOutcome != null){
+ outcome = lastOutcome;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
+
+ outside: while(true){
+ // loop through existing records, adding as necessary.
+ for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
+ if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ if (isSame( previousIndex, currentIndex )) {
+ if(EXTRA_DEBUG) logger.debug("Values were found the same, adding.");
+ addRecordInc(currentIndex);
+ } else {
+ if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch.");
+ if (outputToBatch(previousIndex)) {
+ if(EXTRA_DEBUG) logger.debug("Output successful.");
+ addRecordInc(currentIndex);
+ } else {
+ if(EXTRA_DEBUG) logger.debug("Output failed.");
+ if(outputCount == 0) return tooBigFailure();
+
+ // mark the pending output but move forward for the next cycle.
+ pendingOutput = true;
+ previousIndex = currentIndex;
+ incIndex();
+ return setOkAndReturn();
+
+ }
+ }
+ previousIndex = currentIndex;
+ }
+
+
+ InternalBatch previous = null;
+
+ try{
+ while(true){
+ previous = new InternalBatch(incoming);
+ IterOutcome out = incoming.next();
+ if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
+ switch(out){
+ case NONE:
+ lastOutcome = out;
+ if(addedRecordCount > 0){
+ if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
+ if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
+ return setOkAndReturn();
+ }else{
+ outcome = out;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
+
+
+
+ case NOT_YET:
+ this.outcome = out;
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ if(EXTRA_DEBUG) logger.debug("Received new schema.");
+ if(addedRecordCount > 0){
+ if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
+ if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning.");
+ newSchema = true;
+ return setOkAndReturn();
+ }
+ cleanup();
+ return AggOutcome.UPDATE_AGGREGATOR;
+ case OK:
+ resetIndex();
+ if(incoming.getRecordCount() == 0){
+ continue;
+ }else{
+ if(isSamePrev(previousIndex , previous, currentIndex)){
+ if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding.");
+ addRecordInc(currentIndex);
+ previousIndex = currentIndex;
+ incIndex();
+ if(EXTRA_DEBUG) logger.debug("Continuing outside");
+ continue outside;
+ }else{ // not the same
+ if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside.");
+ previousIndex = currentIndex;
+ if(addedRecordCount > 0){
+ if( !outputToBatchPrev( previous, previousIndex, outputCount) ){
+ remainderBatch = previous;
+ return setOkAndReturn();
+ }
+ continue outside;
+ }
+ }
+ }
+ case STOP:
+ default:
+ lastOutcome = out;
+ outcome = out;
+ return AggOutcome.CLEANUP_AND_RETURN;
+ }
+
+
+ }
+ }finally{
+ // make sure to clear previous if we haven't saved it.
+ if(remainderBatch == null && previous != null){
+ previous.clear();
+ }
+ }
+ }
+ }finally{
+ if(first) first = !first;
+ }
+
+ }
+
+
+ private final void incIndex(){
+ underlyingIndex++;
+ if(underlyingIndex >= incoming.getRecordCount()){
+ currentIndex = Integer.MAX_VALUE;
+ return;
+ }
+ currentIndex = getVectorIndex(underlyingIndex);
+ }
+
+ private final void resetIndex(){
+ underlyingIndex = -1;
+ incIndex();
+ }
+
+ private final AggOutcome setOkAndReturn(){
+ if(first){
+ this.outcome = IterOutcome.OK_NEW_SCHEMA;
+ }else{
+ this.outcome = IterOutcome.OK;
+ }
+ for(VectorWrapper<?> v : outgoing){
+ v.getValueVector().getMutator().setValueCount(outputCount);
+ }
+ return AggOutcome.RETURN_OUTCOME;
+ }
+
+ private final boolean outputToBatch(int inIndex){
+ boolean success = outputRecordKeys(inIndex, outputCount) //
+ && outputRecordValues(outputCount) //
+ && resetValues();
+ if(success){
+ if(EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount);
+ outputCount++;
+ addedRecordCount = 0;
+ }
+
+ return success;
+ }
+
+ private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){
+ boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
+ && outputRecordValues(outIndex) //
+ && resetValues();
+ if(success){
+ outputCount++;
+ addedRecordCount = 0;
+ }
+
+ return success;
+ }
+
+ private void addRecordInc(int index){
+ addRecord(index);
+ this.addedRecordCount++;
+ }
+
+ @Override
+ public void cleanup(){
+ if(remainderBatch != null) remainderBatch.clear();
+ }
+
+
+ public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+ public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
+ public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
+ public abstract void addRecord(@Named("index") int index);
+ public abstract boolean outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
+ public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+ public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+ public abstract boolean resetValues();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
new file mode 100644
index 0000000..fdda8c8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/Aggregator.java
@@ -0,0 +1,26 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.AggBatch.AggOutcome;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+public interface Aggregator {
+
+ public static TemplateClassDefinition<Aggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<Aggregator>(Aggregator.class, AggTemplate.class);
+
+ public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+ VectorAllocator[] allocators) throws SchemaChangeException;
+
+ public abstract IterOutcome getOutcome();
+
+ public abstract int getOutputCount();
+
+ public abstract AggOutcome doWork();
+
+ public abstract void cleanup();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
new file mode 100644
index 0000000..2f9cabe
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java
@@ -0,0 +1,9 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+public interface BatchIterator {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchIterator.class);
+
+ public IterOutcome next();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
new file mode 100644
index 0000000..343dbe5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -0,0 +1,65 @@
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class InternalBatch implements Iterable<VectorWrapper<?>>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
+
+ private final VectorContainer container;
+ private final BatchSchema schema;
+ private final SelectionVector2 sv2;
+ private final SelectionVector4 sv4;
+
+ public InternalBatch(RecordBatch incoming){
+ switch(incoming.getSchema().getSelectionVectorMode()){
+ case FOUR_BYTE:
+ this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
+ this.sv2 = null;
+ break;
+ case TWO_BYTE:
+ this.sv4 = null;
+ this.sv2 = incoming.getSelectionVector2().clone();
+ break;
+ default:
+ this.sv4 = null;
+ this.sv2 = null;
+ }
+ this.schema = incoming.getSchema();
+ this.container = VectorContainer.getTransferClone(incoming);
+ }
+
+ public BatchSchema getSchema() {
+ return schema;
+ }
+
+ public SelectionVector2 getSv2() {
+ return sv2;
+ }
+
+ public SelectionVector4 getSv4() {
+ return sv4;
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ public void clear(){
+ if(sv2 != null) sv2.clear();
+ if(sv4 != null) sv4.clear();
+ container.clear();
+ }
+
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+ return container.getVectorAccessor(fieldId, clazz);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
deleted file mode 100644
index 0fad224..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterEvaluator.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.drill.exec.physical.impl.filter;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface FilterEvaluator {
- public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public boolean doEval(int inIndex, int outIndex);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index 18aa484..a7e8f0c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -1,5 +1,7 @@
package org.apache.drill.exec.physical.impl.filter;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -78,6 +80,7 @@ public abstract class FilterTemplate implements Filterer{
outgoingSelectionVector.setRecordCount(svIndex);
}
- protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- protected abstract boolean doEval(int inIndex, int outIndex);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index fb08ef3..fec147f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -12,7 +12,6 @@ public interface Filterer {
public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
public void filterBatch(int recordCount);
- public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>( //
- Filterer.class, "org.apache.drill.exec.physical.impl.filter.FilterTemplate", FilterEvaluator.class, FilterSignature.class);
+ public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index bc1ef4e..0cefc52 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -19,25 +19,33 @@
package org.apache.drill.exec.physical.impl.partitionsender;
import java.util.Iterator;
-import java.util.List;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.bit.BitTunnel;
-import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
import org.apache.drill.exec.work.foreman.ErrorHelper;
+import com.google.common.base.Preconditions;
+
/**
* OutgoingRecordBatch is a holder of value vectors which are to be sent to another host. Thus,
* next() will never be called on this object. When a record batch is ready to send (e.g. nearing size
@@ -121,9 +129,9 @@ public class OutgoingRecordBatch implements RecordBatch {
// NOTE: the value vector is directly referenced by generated code; therefore references
// must remain valid.
recordCount = 0;
- for (VectorWrapper v : vectorContainer) {
+ for (VectorWrapper<?> v : vectorContainer) {
logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush.");
- getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
+ VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount());
}
if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
return true;
@@ -139,14 +147,14 @@ public class OutgoingRecordBatch implements RecordBatch {
vectorContainer = new VectorContainer();
SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
- for (VectorWrapper v : incoming) {
+ for (VectorWrapper<?> v : incoming) {
// add field to the output schema
bldr.addField(v.getField());
// allocate a new value vector
ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
- getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
+ VectorAllocator.getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity);
vectorContainer.add(outgoingVector);
logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
}
@@ -162,7 +170,7 @@ public class OutgoingRecordBatch implements RecordBatch {
isLast = false;
recordCount = 0;
recordCapacity = 0;
- for (VectorWrapper v : vectorContainer)
+ for (VectorWrapper<?> v : vectorContainer)
v.getValueVector().clear();
}
@@ -227,49 +235,6 @@ public class OutgoingRecordBatch implements RecordBatch {
return WritableBatch.get(this);
}
- private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
- if(outgoing instanceof FixedWidthVector){
- return new FixedVectorAllocator((FixedWidthVector) outgoing);
- }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
- return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
- }else{
- throw new UnsupportedOperationException();
- }
- }
-
- private class FixedVectorAllocator implements VectorAllocator{
- FixedWidthVector out;
-
- public FixedVectorAllocator(FixedWidthVector out) {
- super();
- this.out = out;
- }
-
- public void alloc(int recordCount){
- out.allocateNew(recordCount);
- out.getMutator().setValueCount(recordCount);
- }
- }
-
- private class VariableVectorAllocator implements VectorAllocator{
- VariableWidthVector in;
- VariableWidthVector out;
-
- public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
- super();
- this.in = in;
- this.out = out;
- }
-
- public void alloc(int recordCount){
- out.allocateNew(in.getByteCapacity(), recordCount);
- out.getMutator().setValueCount(recordCount);
- }
- }
-
- public interface VectorAllocator{
- public void alloc(int recordCount);
- }
private StatusHandler statusHandler = new StatusHandler();
private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 992ffdf..aa96d3f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,9 +31,5 @@ public interface Partitioner {
public abstract void partitionBatch(RecordBatch incoming);
- public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION =
- new TemplateClassDefinition<>(Partitioner.class,
- "org.apache.drill.exec.physical.impl.partitionsender.PartitionerTemplate",
- PartitionerEvaluator.class,
- PartitionerInnerSignature.class);
+ public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
deleted file mode 100644
index 8c92fdc..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerEvaluator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface PartitionerEvaluator {
- public void doSetup(FragmentContext context, RecordBatch incoming, OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
- public void doEval(int inIndex, int outIndex);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 7198c3a..4fdd740 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.physical.impl.partitionsender;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
@@ -51,7 +53,8 @@ public abstract class PartitionerTemplate implements Partitioner {
}
- protected abstract void doSetup(FragmentContext context, RecordBatch incoming, OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
- protected abstract void doEval(int inIndex, int outIndex);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
deleted file mode 100644
index 75632e7..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.drill.exec.physical.impl.project;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface ProjectEvaluator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
-
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doEval(int inIndex, int outIndex) throws SchemaChangeException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 8f06290..c44d988 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -5,12 +5,8 @@ import java.util.List;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
@@ -19,14 +15,12 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.TypeHelper;
@@ -76,7 +70,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
for(int i =0; i < exprs.size(); i++){
final NamedExpression namedExpression = exprs.get(i);
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector);
- final MaterializedField outputField = getMaterializedField(namedExpression.getRef(), expr);
+ final MaterializedField outputField = MaterializedField.create(namedExpression.getRef(), expr.getMajorType());
if(collector.hasErrors()){
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -87,7 +81,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
Preconditions.checkNotNull(incoming);
- TransferPair tp = vvIn.getTransferPair();
+ TransferPair tp = vvIn.getTransferPair(namedExpression.getRef());
transfers.add(tp);
container.add(tp.getTo());
logger.debug("Added transfer.");
@@ -95,7 +89,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// need to do evaluation.
ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
allocationVectors.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(container.add(vector), expr);
+ TypedFieldId fid = container.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
cg.addExpr(write);
logger.debug("Added eval.");
}
@@ -112,29 +107,5 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
}
- private MaterializedField getMaterializedField(FieldReference reference, LogicalExpression expr){
- return new MaterializedField(getFieldDef(reference, expr.getMajorType()));
- }
-
- private FieldDef getFieldDef(SchemaPath path, MajorType type){
- return FieldDef //
- .newBuilder() //
- .addAllName(getNameParts(path.getRootSegment())) //
- .setMajorType(type) //
- .build();
- }
-
- private List<NamePart> getNameParts(PathSegment seg){
- List<NamePart> parts = Lists.newArrayList();
- while(seg != null){
- if(seg.isArray()){
- parts.add(NamePart.newBuilder().setType(Type.ARRAY).build());
- }else{
- parts.add(NamePart.newBuilder().setType(Type.NAME).setName(seg.getNameSegment().getPath().toString()).build());
- }
- seg = seg.getChild();
- }
- return parts;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index ba83e61..327f8b8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -3,7 +3,6 @@ package org.apache.drill.exec.physical.impl.project;
import java.util.List;
import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
@@ -12,11 +11,8 @@ import org.apache.drill.exec.record.TransferPair;
public interface Projector {
public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
-
-
public abstract int projectRecords(int recordCount, int firstOutputIndex);
- public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>( //
- Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectorTemplate", ProjectEvaluator.class);
+ public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 5f15c2d..0f4b90d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -2,6 +2,8 @@ package org.apache.drill.exec.physical.impl.project;
import java.util.List;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -71,8 +73,9 @@ public abstract class ProjectorTemplate implements Projector {
doSetup(context, incoming, outgoing);
}
- protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- protected abstract void doEval(int inIndex, int outIndex);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
deleted file mode 100644
index 86d0d61..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface Comparator {
-
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract int doEval(int inIndex, int outIndex);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
deleted file mode 100644
index 02fffa5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.IfExpression;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.QuotedString;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.exec.expr.ValueVectorReadExpression;
-
-import com.google.common.collect.Lists;
-
-public class ReadIndexRewriter implements ExprVisitor<LogicalExpression, String, RuntimeException> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReadIndexRewriter.class);
-
- private String batchName;
-
-
- @Override
- public LogicalExpression visitUnknown(LogicalExpression e, String newIndexName) {
- if (e instanceof ValueVectorReadExpression) {
- ValueVectorReadExpression old = (ValueVectorReadExpression) e;
- return new ValueVectorReadExpression(old.getTypedFieldId(), newIndexName);
- } else {
- throw new UnsupportedOperationException(String.format(
- "ReadIndex rewriter doesn't know how to rewrite expression of type %s.", e.getClass().getName()));
- }
- }
-
- @Override
- public LogicalExpression visitFunctionCall(FunctionCall call, String newIndexName) {
- List<LogicalExpression> args = Lists.newArrayList();
- for (int i = 0; i < call.args.size(); ++i) {
- LogicalExpression newExpr = call.args.get(i).accept(this, null);
- args.add(newExpr);
- }
-
- return new FunctionCall(call.getDefinition(), args, call.getPosition());
- }
-
- @Override
- public LogicalExpression visitIfExpression(IfExpression ifExpr, String newIndexName) {
- List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.conditions);
- LogicalExpression newElseExpr = ifExpr.elseExpression.accept(this, null);
-
- for (int i = 0; i < conditions.size(); ++i) {
- IfExpression.IfCondition condition = conditions.get(i);
-
- LogicalExpression newCondition = condition.condition.accept(this, null);
- LogicalExpression newExpr = condition.expression.accept(this, null);
- conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
- }
-
- return IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build();
- }
-
- @Override
- public LogicalExpression visitSchemaPath(SchemaPath path, String newIndexName) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LogicalExpression visitLongConstant(LongExpression intExpr, String value) throws RuntimeException {
- return intExpr;
- }
-
- @Override
- public LogicalExpression visitDoubleConstant(DoubleExpression dExpr, String value) throws RuntimeException {
- return dExpr;
- }
-
- @Override
- public LogicalExpression visitBooleanConstant(BooleanExpression e, String value) throws RuntimeException {
- return e;
- }
-
- @Override
- public LogicalExpression visitQuotedStringConstant(QuotedString e, String value) throws RuntimeException {
- return e;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index a21af09..7e44429 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -14,7 +14,7 @@ import com.google.common.collect.Lists;
/**
* Holds the data for a particular record batch for later manipulation.
*/
-class RecordBatchData {
+public class RecordBatchData {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
final List<ValueVector> vectors = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index c9bd55d..8cffda3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -9,6 +9,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
@@ -32,6 +33,10 @@ import com.sun.codemodel.JExpr;
public class SortBatch extends AbstractRecordBatch<Sort> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
+ public static final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ public static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+ public static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, CodeGenerator.DEFAULT_SCALAR_MAP, CodeGenerator.DEFAULT_SCALAR_MAP);
+
private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024;
private final RecordBatch incoming;
@@ -47,7 +52,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public int getRecordCount() {
- return sv4.getLength();
+ return sv4.getCount();
}
@Override
@@ -127,18 +132,18 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
private Sorter createNewSorter() throws ClassTransformationException, IOException, SchemaChangeException{
CodeGenerator<Sorter> g = new CodeGenerator<Sorter>(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- g.setMappingSet(SortSignature.MAIN_MAPPING);
+ g.setMappingSet(MAIN_MAPPING);
for(OrderDef od : popConfig.getOrderings()){
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), this, collector);
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- g.setMappingSet(SortSignature.LEFT_MAPPING);
+ g.setMappingSet(LEFT_MAPPING);
HoldingContainer left = g.addExpr(expr, false);
- g.setMappingSet(SortSignature.RIGHT_MAPPING);
+ g.setMappingSet(RIGHT_MAPPING);
HoldingContainer right = g.addExpr(expr, false);
- g.setMappingSet(SortSignature.MAIN_MAPPING);
+ g.setMappingSet(MAIN_MAPPING);
// next we wrap the two comparison sides and add the expression block for the comparison.
FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(left), new HoldingContainerExpression(right)), ExpressionPosition.UNKNOWN);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
deleted file mode 100644
index 7614f3e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortSignature.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.drill.exec.physical.impl.sort;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface SortSignature extends CodeGeneratorSignature{
-
- public static final MappingSet MAIN_MAPPING = new MappingSet("null", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
- public static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
- public static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", "null", DefaultGeneratorSignature.DEFAULT_SCALAR_MAP, DefaultGeneratorSignature.DEFAULT_SCALAR_MAP);
-
- public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
- public int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index d312fb4..ddd066f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -1,5 +1,7 @@
package org.apache.drill.exec.physical.impl.sort;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
@@ -40,6 +42,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{
return doEval(sv1, sv2);
}
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract int doEval(int leftIndex, int rightIndex);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
index 1a76423..9cf98e3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -11,7 +11,6 @@ public interface Sorter {
public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
public void sort(SelectionVector4 vector4, VectorContainer container);
- public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
- Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, SortSignature.class);
+ public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 363bbee..7495e31 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -1,18 +1,14 @@
package org.apache.drill.exec.physical.impl.svremover;
import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.compile.sig.DefaultGeneratorSignature;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
public interface Copier {
- public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
- Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class);
-
- public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
- Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class);
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class);
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class);
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
public abstract void copyRecords();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 4dc38f2..1f99f04 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -1,10 +1,12 @@
package org.apache.drill.exec.physical.impl.svremover;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
public abstract class CopierTemplate2 implements Copier{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
@@ -36,8 +38,9 @@ public abstract class CopierTemplate2 implements Copier{
}
}
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doEval(int incoming, int outgoing);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index 2cf033e..6bf952d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -1,11 +1,13 @@
package org.apache.drill.exec.physical.impl.svremover;
+import javax.inject.Named;
+
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
public abstract class CopierTemplate4 implements Copier{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
@@ -30,18 +32,18 @@ public abstract class CopierTemplate4 implements Copier{
@Override
public void copyRecords(){
- final int recordCount = sv4.getLength();
+ final int recordCount = sv4.getCount();
allocateVectors(recordCount);
int outgoingPosition = 0;
- final int end = sv4.getStart() + sv4.getLength();
- for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+ for(int svIndex = 0; svIndex < sv4.getCount(); svIndex++, outgoingPosition++){
int deRefIndex = sv4.get(svIndex);
doEval(deRefIndex, outgoingPosition);
}
}
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doEval(int incoming, int outgoing);
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
deleted file mode 100644
index 25c51bb..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopyEvaluator.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-
-public interface CopyEvaluator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopyEvaluator.class);
-
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing);
- public abstract void doEval(int incoming, int outgoing);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index e4fd9a0..521061c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -19,14 +19,14 @@ import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
+import org.apache.drill.exec.vector.allocator.VariableEstimatedVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.sun.codemodel.JBlock;
-import com.sun.codemodel.JClass;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
@@ -118,7 +118,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
for(VectorWrapper<?> i : incoming){
ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
container.add(v);
- allocators.add(getAllocator(i.getValueVector(), v));
+ allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v));
}
try {
@@ -203,62 +203,4 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
- private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
- if(outgoing instanceof FixedWidthVector){
- return new FixedVectorAllocator((FixedWidthVector) outgoing);
- }else if(outgoing instanceof VariableWidthVector && in instanceof VariableWidthVector){
- return new VariableVectorAllocator( (VariableWidthVector) in, (VariableWidthVector) outgoing);
- }else{
- throw new UnsupportedOperationException();
- }
- }
-
- private class FixedVectorAllocator implements VectorAllocator{
- FixedWidthVector out;
-
- public FixedVectorAllocator(FixedWidthVector out) {
- super();
- this.out = out;
- }
-
- public void alloc(int recordCount){
- out.allocateNew(recordCount);
- out.getMutator().setValueCount(recordCount);
- }
- }
-
- private class VariableEstimatedVector implements VectorAllocator{
- VariableWidthVector out;
- int avgWidth;
-
- public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
- super();
- this.out = out;
- this.avgWidth = avgWidth;
- }
-
- public void alloc(int recordCount){
- out.allocateNew(avgWidth * recordCount, recordCount);
- out.getMutator().setValueCount(recordCount);
- }
- }
- private class VariableVectorAllocator implements VectorAllocator{
- VariableWidthVector in;
- VariableWidthVector out;
-
- public VariableVectorAllocator(VariableWidthVector in, VariableWidthVector out) {
- super();
- this.in = in;
- this.out = out;
- }
-
- public void alloc(int recordCount){
- out.allocateNew(in.getByteCapacity(), recordCount);
- out.getMutator().setValueCount(recordCount);
- }
- }
-
- public interface VectorAllocator{
- public void alloc(int recordCount);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index e8a5cf8..bbab867 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -1,7 +1,10 @@
package org.apache.drill.exec.record;
+import java.lang.reflect.Array;
+
import org.apache.drill.exec.vector.ValueVector;
+
public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
@@ -48,6 +51,18 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
}
+ @Override
+ @SuppressWarnings("unchecked")
+ public VectorWrapper<T> cloneAndTransfer() {
+ T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
+ for(int i =0; i < newVectors.length; i++){
+ TransferPair tp = vectors[i].getTransferPair();
+ tp.transfer();
+ newVectors[i] = (T) tp.getTo();
+ }
+ return new HyperVectorWrapper<T>(f, newVectors);
+ }
+
public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
return new HyperVectorWrapper<T>(f, v);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index abde61a..d974441 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -18,7 +18,10 @@
package org.apache.drill.exec.record;
import java.util.Iterator;
+import java.util.List;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -28,6 +31,8 @@ import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
import org.apache.drill.exec.vector.TypeHelper;
+import com.google.common.collect.Lists;
+
public class MaterializedField{
private final FieldDef def;
@@ -39,6 +44,10 @@ public class MaterializedField{
return new MaterializedField(def);
}
+ public MaterializedField clone(FieldReference ref){
+ return create(ref, def.getMajorType());
+ }
+
public static MaterializedField create(SchemaPath path, MajorType type) {
FieldDef.Builder b = FieldDef.newBuilder();
b.setMajorType(type);
@@ -172,5 +181,4 @@ public class MaterializedField{
return "MaterializedField [" + def.toString() + "]";
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 94700a2..62ca8a4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -38,6 +38,14 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public VectorWrapper<T> cloneAndTransfer() {
+ TransferPair tp = v.getTransferPair();
+ tp.transfer();
+ return new SimpleVectorWrapper<T>((T) tp.getTo());
+ }
+
@Override
public void release() {
v.clear();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index a90e2d8..2035cac 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -5,4 +5,5 @@ import org.apache.drill.exec.vector.ValueVector;
public interface TransferPair {
public void transfer();
public ValueVector getTo();
+ public void copyValue(int from, int to);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 57aad79..25036fc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,16 +29,35 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
add(vArr);
}
}
-
- public void addHyperList(List<ValueVector> vectors){
+
+ public void addHyperList(List<ValueVector> vectors) {
schema = null;
ValueVector[] vv = new ValueVector[vectors.size()];
- for(int i =0; i < vv.length; i++){
+ for (int i = 0; i < vv.length; i++) {
vv[i] = vectors.get(i);
}
add(vv);
}
+ /**
+ * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
+ * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code.
+ *
+ * @param incoming The RecordBatch iterator the contains the batch we should take over.
+ * @return A cloned vector container.
+ */
+ public static VectorContainer getTransferClone(RecordBatch incoming) {
+ VectorContainer vc = new VectorContainer();
+ for (VectorWrapper<?> w : incoming) {
+ vc.cloneAndTransfer(w);
+ }
+ return vc;
+ }
+
+ private void cloneAndTransfer(VectorWrapper<?> wrapper) {
+ wrappers.add(wrapper.cloneAndTransfer());
+ }
+
public void addCollection(Iterable<ValueVector> vectors) {
schema = null;
for (ValueVector vv : vectors) {
@@ -101,11 +120,13 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
return (VectorWrapper<T>) va;
}
- public BatchSchema getSchema(){
- Preconditions.checkNotNull(schema, "Schema is currently null. You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+ public BatchSchema getSchema() {
+ Preconditions
+ .checkNotNull(schema,
+ "Schema is currently null. You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
return schema;
}
-
+
public void buildSchema(SelectionVectorMode mode) {
SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
for (VectorWrapper<?> v : wrappers) {
@@ -121,7 +142,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
public void clear() {
// TODO: figure out a better approach for this.
- // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+ // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no
+ // data.
// schema = null;
for (VectorWrapper<?> w : wrappers) {
w.release();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index e40dee4..1c5308e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -11,4 +11,5 @@ public interface VectorWrapper<T extends ValueVector> {
public T[] getValueVectors();
public boolean isHyper();
public void release();
+ public VectorWrapper<T> cloneAndTransfer();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 2020f92..a778b1d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -42,7 +42,7 @@ public class SelectionVector4 {
return recordCount;
}
- public int getCurrentCount(){
+ public int getCount(){
return length;
}
@@ -54,15 +54,23 @@ public class SelectionVector4 {
}
public int get(int index){
- return vector.getInt(index*4);
+ return vector.getInt( (start+index)*4);
}
-
- public int getStart() {
- return start;
- }
-
- public int getLength() {
- return length;
+
+ /**
+ * Caution: This method shares the underlying buffer between this vector and the newly created one.
+ * @return Newly created single batch SelectionVector4.
+ * @throws SchemaChangeException
+ */
+ public SelectionVector4 createNewWrapperCurrent(){
+ try {
+ vector.retain();
+ SelectionVector4 sv4 = new SelectionVector4(vector, length, length);
+ sv4.start = this.start;
+ return sv4;
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException("This shouldn't happen.");
+ }
}
public boolean next(){
@@ -73,5 +81,9 @@ public class SelectionVector4 {
return true;
}
+ public void clear(){
+ this.vector.clear();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 024aa21..c5375b2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -87,14 +87,15 @@ public class MockRecordReader implements RecordReader {
@Override
public int next() {
+ if(recordsRead >= this.config.getRecords()) return 0;
+
+ int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead);
- int recordSetSize = Math.min(batchRecordCount, this.config.getRecords()- recordsRead);
-
recordsRead += recordSetSize;
for(ValueVector v : valueVectors){
AllocationHelper.allocate(v, recordSetSize, 50, 5);
- logger.debug("MockRecordReader: Generating random data for VV of type " + v.getClass().getName());
+// logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName()));
ValueVector.Mutator m = v.getMutator();
m.setValueCount(recordSetSize);
m.generateTestData();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 661ceba..358334e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -1,5 +1,6 @@
package org.apache.drill.exec.vector;
+import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
@@ -24,6 +25,10 @@ abstract class BaseValueVector implements ValueVector{
return field;
}
+ public MaterializedField getField(FieldReference ref){
+ return getField().clone(ref);
+ }
+
abstract class BaseAccessor implements ValueVector.Accessor{
public abstract int getValueCount();
public void reset(){}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 77ba98e..8d21298 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -2,12 +2,9 @@ package org.apache.drill.exec.vector;
import io.netty.buffer.ByteBuf;
-import java.util.Random;
-
+import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
@@ -98,10 +95,14 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
return new Accessor();
}
- public TransferPair getTransferPair() {
- return new TransferImpl();
+ public TransferPair getTransferPair(){
+ return new TransferImpl(getField());
+ }
+ public TransferPair getTransferPair(FieldReference ref){
+ return new TransferImpl(getField().clone(ref));
}
+
public void transferTo(BitVector target) {
target.data = data;
target.data.retain();
@@ -112,8 +113,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
private class TransferImpl implements TransferPair {
BitVector to;
- public TransferImpl() {
- this.to = new BitVector(getField(), allocator);
+ public TransferImpl(MaterializedField field) {
+ this.to = new BitVector(field, allocator);
}
public BitVector getTo() {
@@ -123,6 +124,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
public void transfer() {
transferTo(to);
}
+
+ @Override
+ public void copyValue(int fromIndex, int toIndex) {
+ to.copyFrom(fromIndex, toIndex, BitVector.this);
+ }
}
public class Accessor extends BaseAccessor {
@@ -199,6 +205,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
final void set(int index, NullableBitHolder holder) {
set(index, holder.value);
}
+
+ public boolean setSafe(int index, int value) {
+ if(index >= getValueCapacity()) return false;
+ set(index, value);
+ return true;
+ }
public final void setValueCount(int valueCount) {
BitVector.this.valueCount = valueCount;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index dfe8e8c..7d86067 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
+import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
@@ -58,6 +59,9 @@ public interface ValueVector extends Closeable {
* @return
*/
public TransferPair getTransferPair();
+
+
+ public TransferPair getTransferPair(FieldReference ref);
/**
* Given the current buffer allocation, return the maximum number of values that this buffer can contain.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
new file mode 100644
index 0000000..987dcf9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/FixedVectorAllocator.java
@@ -0,0 +1,24 @@
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.FixedWidthVector;
+
+public class FixedVectorAllocator extends VectorAllocator{
+ FixedWidthVector out;
+
+ public FixedVectorAllocator(FixedWidthVector out) {
+ super();
+ this.out = out;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
+
+ @Override
+ public String toString() {
+ return "FixedVectorAllocator [out=" + out + "]";
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e43093d9/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
new file mode 100644
index 0000000..7b9f4a7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VariableEstimatedVector.java
@@ -0,0 +1,19 @@
+package org.apache.drill.exec.vector.allocator;
+
+import org.apache.drill.exec.vector.VariableWidthVector;
+
+public class VariableEstimatedVector extends VectorAllocator{
+ VariableWidthVector out;
+ int avgWidth;
+
+ public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+ super();
+ this.out = out;
+ this.avgWidth = avgWidth;
+ }
+
+ public void alloc(int recordCount){
+ out.allocateNew(avgWidth * recordCount, recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
+}
\ No newline at end of file