You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:58:02 UTC
[41/53] [abbrv] Types transition
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index 0f4619c..7b76d05 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -18,15 +18,18 @@
package org.apache.drill.exec.physical.config;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MockScanPOP.MockColumn;
import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.NonRepeatedMutator;
@@ -57,12 +60,12 @@ public class MockRecordReader implements RecordReader {
return x;
}
- private ValueVector getVector(int fieldId, String name, MajorType type, int length) {
+ private ValueVector<?> getVector(String name, MajorType type, int length) {
assert context != null : "Context shouldn't be null.";
if(type.getMode() != DataMode.REQUIRED) throw new UnsupportedOperationException();
- MaterializedField f = MaterializedField.create(new SchemaPath(name), fieldId, 0, type);
- ValueVector v;
+ MaterializedField f = MaterializedField.create(new SchemaPath(name, ExpressionPosition.UNKNOWN), type);
+ ValueVector<?> v;
v = TypeHelper.getNewVector(f, context.getAllocator());
if(v instanceof FixedWidthVector){
((FixedWidthVector)v).allocateNew(length);
@@ -85,8 +88,8 @@ public class MockRecordReader implements RecordReader {
batchRecordCount = 250000 / estimateRowSize;
for (int i = 0; i < config.getTypes().length; i++) {
- valueVectors[i] = getVector(i, config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
- output.addField(i, valueVectors[i]);
+ valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
+ output.addField(valueVectors[i]);
}
output.setNewSchema();
} catch (SchemaChangeException e) {
@@ -128,7 +131,7 @@ public class MockRecordReader implements RecordReader {
public void cleanup() {
for (int i = 0; i < valueVectors.length; i++) {
try {
- output.removeField(valueVectors[i].getField().getFieldId());
+ output.removeField(valueVectors[i].getField());
} catch (SchemaChangeException e) {
logger.warn("Failure while trying to remove field.", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
index 40227e5..3802ce2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
@@ -21,6 +21,9 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.ReadEntry;
@@ -29,10 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Scan;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
-import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
-import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.TypeHelper;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
index 6440d98..80d48f4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
@@ -17,12 +17,21 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.InvalidValueAccessor;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.vector.SelectionVector;
+<<<<<<< HEAD
import org.apache.drill.exec.vector.ValueVector;
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
public abstract class FilterRecordBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
@@ -57,8 +66,33 @@ public abstract class FilterRecordBatch implements RecordBatch {
incoming.kill();
}
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return null;
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return null;
+ }
+
+ @Override
+ public TypedFieldId getValueVector(SchemaPath path) {
+ return null;
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+ return null;
+ }
+
@Override
+<<<<<<< HEAD
public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+=======
+ public WritableBatch getWritableBatch() {
+>>>>>>> Build working
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index b3b9f5f..5247d08 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -18,10 +18,19 @@
package org.apache.drill.exec.physical.impl;
import org.apache.drill.exec.exception.SchemaChangeException;
+<<<<<<< HEAD
import org.apache.drill.exec.vector.ValueVector;
public interface OutputMutator {
public void removeField(int fieldId) throws SchemaChangeException;
public void addField(int fieldId, ValueVector vector) throws SchemaChangeException ;
+=======
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public interface OutputMutator {
+ public void removeField(MaterializedField field) throws SchemaChangeException;
+ public void addField(ValueVector<?> vector) throws SchemaChangeException ;
+>>>>>>> Build working
public void setNewSchema() throws SchemaChangeException ;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 2f3e1fe..1e0c000 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -18,20 +18,37 @@
package org.apache.drill.exec.physical.impl;
import java.util.Iterator;
+<<<<<<< HEAD
+=======
+import java.util.List;
+import java.util.Map;
+>>>>>>> Build working
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.WritableBatch;
+<<<<<<< HEAD
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.ValueVector;
+<<<<<<< HEAD
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+=======
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
+>>>>>>> Build working
/**
* Record batch used for a particular scan. Operators against one or more
@@ -39,7 +56,14 @@ import com.carrotsearch.hppc.procedures.IntObjectProcedure;
public class ScanBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+<<<<<<< HEAD
private IntObjectOpenHashMap<ValueVector> fields = new IntObjectOpenHashMap<ValueVector>();
+=======
+ final List<ValueVector<?>> vectors = Lists.newLinkedList();
+ final Map<MaterializedField, ValueVector<?>> fieldVectorMap = Maps.newHashMap();
+
+ private VectorHolder holder = new VectorHolder(vectors);
+>>>>>>> Build working
private BatchSchema schema;
private int recordCount;
private boolean schemaChanged = true;
@@ -83,6 +107,7 @@ public class ScanBatch implements RecordBatch {
}
private void releaseAssets() {
+<<<<<<< HEAD
fields.forEach(new IntObjectProcedure<ValueVector>() {
@Override
public void apply(int key, ValueVector value) {
@@ -102,9 +127,13 @@ public class ScanBatch implements RecordBatch {
throw new InvalidValueAccessor(String.format(
"You requested a field accessor of type %s for field id %d but the actual type was %s.",
clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
+=======
+ for(ValueVector<?> v : vectors){
+ v.close();
+>>>>>>> Build working
}
}
-
+
@Override
public IterOutcome next() {
while ((recordCount = currentReader.next()) == 0) {
@@ -132,11 +161,34 @@ public class ScanBatch implements RecordBatch {
}
}
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVector(SchemaPath path) {
+ return holder.getValueVector(path);
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> clazz) {
+ return holder.getValueVector(fieldId, clazz);
+ }
+
+
private class Mutator implements OutputMutator {
private SchemaBuilder builder = BatchSchema.newBuilder();
- public void removeField(int fieldId) throws SchemaChangeException {
+ public void removeField(MaterializedField field) throws SchemaChangeException {
schemaChanged();
+<<<<<<< HEAD
ValueVector v = fields.remove(fieldId);
if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
v.close();
@@ -146,8 +198,18 @@ public class ScanBatch implements RecordBatch {
schemaChanged();
ValueVector v = fields.put(fieldId, vector);
vector.getField();
+=======
+ ValueVector<?> vector = fieldVectorMap.remove(field);
+ if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
+ vectors.remove(vector);
+ vector.close();
+ }
+
+ public void addField(ValueVector<?> vector) {
+ vectors.add(vector);
+ fieldVectorMap.put(vector.getField(), vector);
+>>>>>>> Build working
builder.addField(vector.getField());
- if (v != null) v.close();
}
@Override
@@ -160,7 +222,7 @@ public class ScanBatch implements RecordBatch {
@Override
public WritableBatch getWritableBatch() {
- return WritableBatch.get(this.getRecordCount(), fields);
+ return WritableBatch.get(this.getRecordCount(), vectors);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
new file mode 100644
index 0000000..4209daa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
@@ -0,0 +1,39 @@
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class VectorHolder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorHolder.class);
+
+ private List<ValueVector<?>> vectors;
+
+ public VectorHolder(List<ValueVector<?>> vectors) {
+ super();
+ this.vectors = vectors;
+ }
+
+ public TypedFieldId getValueVector(SchemaPath path) {
+ for(int i =0; i < vectors.size(); i++){
+ ValueVector<?> vv = vectors.get(i);
+ if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i);
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<?> clazz) {
+ ValueVector<?> v = vectors.get(fieldId);
+ assert v != null;
+ if (v.getClass() != clazz){
+ logger.warn(String.format(
+ "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+ clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+ return null;
+ }
+ return (T) v;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index fcbd272..f4921b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -17,17 +17,26 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+<<<<<<< HEAD
+=======
+import org.apache.drill.common.expression.SchemaPath;
+>>>>>>> Build working
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.WritableBatch;
+<<<<<<< HEAD
import org.apache.drill.exec.vector.ValueVector;
+=======
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+>>>>>>> Build working
public class WireRecordBatch implements RecordBatch{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
@@ -64,11 +73,32 @@ public class WireRecordBatch implements RecordBatch{
fragProvider.kill(context);
}
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
@Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVector(SchemaPath path) {
+ return batchLoader.getValueVector(path);
+ }
+
+ @Override
+<<<<<<< HEAD
public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+=======
+ public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> clazz) {
+>>>>>>> Build working
return batchLoader.getValueVector(fieldId, clazz);
}
+
@Override
public IterOutcome next() {
RawFragmentBatch batch = fragProvider.getNext();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
new file mode 100644
index 0000000..f547989
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvalSetupException.java
@@ -0,0 +1,5 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+public class EvalSetupException extends Exception{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EvalSetupException.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
new file mode 100644
index 0000000..3176c41
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java
@@ -0,0 +1,13 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.record.vector.SelectionVector2;
+
+public class EvaluationPredicate {
+ private SelectionVector2 vector;
+
+ EvaluationPredicate(String pred){
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
new file mode 100644
index 0000000..158350f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ExampleFilter.java
@@ -0,0 +1,113 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class ExampleFilter implements RecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExampleFilter.class);
+
+ //private EvalutationPredicates []
+ private RecordBatch incoming;
+ private BatchSchema outboundSchema;
+ private int recordCount;
+
+ private void reconfigureSchema() throws SchemaChangeException {
+ BatchSchema in = incoming.getSchema();
+ outboundSchema = BatchSchema.newBuilder().addFields(in).setSelectionVectorMode(BatchSchema.SelectionVectorMode.TWO_BYTE).build();
+ }
+
+ private int generateSelectionVector(){
+ return -1;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return incoming.getContext();
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return outboundSchema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return recordCount; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void kill() {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return null;
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return null;
+ }
+
+ @Override
+ public TypedFieldId getValueVector(SchemaPath path) {
+ return null;
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+ return null;
+ }
+
+ @Override
+ public IterOutcome next() {
+ IterOutcome out = incoming.next();
+ switch (incoming.next()) {
+
+ case NONE:
+ return IterOutcome.NONE;
+ case OK_NEW_SCHEMA:
+ //reconfigureSchema();
+ case OK:
+ this.recordCount = generateSelectionVector();
+ return out;
+ case STOP:
+ return IterOutcome.STOP;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
new file mode 100644
index 0000000..69daae0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectEvaluator.java
@@ -0,0 +1,12 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface ProjectEvaluator {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectEvaluator.class);
+
+ public abstract void setupEvaluators(FragmentContext context, RecordBatch incoming) throws SchemaChangeException;
+ public abstract void doPerRecordWork(int inIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
new file mode 100644
index 0000000..cfdb7bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -0,0 +1,218 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class ProjectRecordBatch implements RecordBatch{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
+
+ private final Project pop;
+ private final RecordBatch incoming;
+ private final FragmentContext context;
+ private BatchSchema outSchema;
+ private Projector projector;
+ private List<ValueVector<?>> allocationVectors;
+ private List<ValueVector<?>> outputVectors;
+
+
+ public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
+ this.pop = pop;
+ this.incoming = incoming;
+ this.context = context;
+ }
+
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ Preconditions.checkNotNull(outSchema);
+ return outSchema;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return incoming.getRecordCount();
+ }
+
+ @Override
+ public void kill() {
+ incoming.kill();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVector(SchemaPath path) {
+ return null;
+ }
+
+ @Override
+ public <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass) {
+ return null;
+ }
+
+ @Override
+ public IterOutcome next() {
+
+ IterOutcome upstream = incoming.next();
+ switch(upstream){
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try{
+ projector = createNewProjector();
+ }catch(SchemaChangeException ex){
+ incoming.kill();
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ int recordCount = incoming.getRecordCount();
+ for(ValueVector<?> v : this.allocationVectors){
+ v.allocateNew(recordCount);
+ }
+ projector.projectRecords(recordCount, 0);
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+ private Projector createNewProjector() throws SchemaChangeException{
+ this.allocationVectors = Lists.newArrayList();
+ if(outputVectors != null){
+ for(ValueVector<?> v : outputVectors){
+ v.close();
+ }
+ }
+ this.outputVectors = Lists.newArrayList();
+
+ final List<NamedExpression> exprs = pop.getExprs();
+ final ErrorCollector collector = new ErrorCollectorImpl();
+ final List<TransferPairing<?>> transfers = Lists.newArrayList();
+
+ final CodeGenerator cg = new CodeGenerator("setupEvaluators", "doPerRecordWork", context.getFunctionRegistry());
+
+ for(int i =0; i < exprs.size(); i++){
+ final NamedExpression namedExpression = exprs.get(i);
+ final MaterializedField outputField = getMaterializedField(namedExpression);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector);
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+
+
+
+ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVector() == SelectionVectorMode.NONE){
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ ValueVector<?> vvIn = incoming.getValueVectorById(vectorRead.getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType()));
+ Preconditions.checkNotNull(incoming);
+
+ TransferPairing<?> tp = vvIn.getTransferPair(outputField);
+ transfers.add(tp);
+ outputVectors.add(tp.getTo());
+ }else{
+ // need to do evaluation.
+ ValueVector<?> vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ allocationVectors.add(vector);
+ outputVectors.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
+ cg.addNextWrite(write);
+ }
+
+ }
+
+ SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
+ for(ValueVector<?> v : outputVectors){
+ bldr.addField(v.getField());
+ }
+ this.outSchema = bldr.build();
+
+ try {
+ return context.getImplementationClass(Projector.TEMPLATE_DEFINITION, cg);
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
+ }
+
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return null;
+ }
+
+
+ private MaterializedField getMaterializedField(NamedExpression ex){
+ return new MaterializedField(getFieldDef(ex.getRef(), ex.getExpr().getMajorType()));
+ }
+
+ private FieldDef getFieldDef(SchemaPath path, MajorType type){
+ return FieldDef //
+ .newBuilder() //
+ .addAllName(getNameParts(path.getRootSegment())) //
+ .setMajorType(type) //
+ .build();
+ }
+
+ private List<NamePart> getNameParts(PathSegment seg){
+ List<NamePart> parts = Lists.newArrayList();
+ while(seg != null){
+ if(seg.isArray()){
+ parts.add(NamePart.newBuilder().setType(Type.ARRAY).build());
+ }else{
+ parts.add(NamePart.newBuilder().setType(Type.NAME).setName(seg.getNameSegment().getPath().toString()).build());
+ }
+ }
+ return parts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
new file mode 100644
index 0000000..31c418c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -0,0 +1,20 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface Projector {
+
+ public abstract void setup(FragmentContext context, RecordBatch incoming, List<TransferPairing<?>> transfers) throws SchemaChangeException;
+
+
+ public abstract void projectRecords(int recordCount, int firstOutputIndex);
+
+ public static TemplateClassDefinition<Projector, Void> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector, Void>( //
+ Projector.class, "org.apache.drill.exec.physical.impl.project.ProjectTemplate", ProjectEvaluator.class, Void.class);
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
new file mode 100644
index 0000000..60af7d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -0,0 +1,101 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public abstract class ProjectorTemplate implements Projector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
+
+ private ImmutableList<TransferPairing<?>> transfers;
+ private SelectionVector2 vector2;
+ private SelectionVector4 vector4;
+ private SelectionVectorMode svMode;
+
+ public ProjectorTemplate(final FragmentContext context, final RecordBatch incomingBatch, final Project pop, FunctionImplementationRegistry funcRegistry) throws SchemaChangeException{
+ super();
+ }
+
+ @Override
+ public final void projectRecords(final int recordCount, int firstOutputIndex) {
+ switch(svMode){
+ case FOUR_BYTE:
+ throw new UnsupportedOperationException();
+
+
+ case TWO_BYTE:
+ final int count = recordCount*2;
+ for(int i = 0; i < count; i+=2, firstOutputIndex++){
+ doPerRecordWork(vector2.getIndex(i), firstOutputIndex);
+ }
+ return;
+
+
+ case NONE:
+
+ for(TransferPairing<?> t : transfers){
+ t.transfer();
+ }
+ final int countN = recordCount;
+ for (int i = 0; i < countN; i++, firstOutputIndex++) {
+ doPerRecordWork(i, firstOutputIndex);
+ }
+ return;
+
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public final void setup(FragmentContext context, RecordBatch incoming, List<TransferPairing<?>> transfers) throws SchemaChangeException{
+
+ this.svMode = incoming.getSchema().getSelectionVector();
+ switch(svMode){
+ case FOUR_BYTE:
+ this.vector4 = incoming.getSelectionVector4();
+ break;
+ case TWO_BYTE:
+ this.vector2 = incoming.getSelectionVector2();
+ break;
+ }
+ this.transfers = ImmutableList.copyOf(transfers);
+ setupEvaluators(context, incoming);
+ }
+
+ protected abstract void setupEvaluators(FragmentContext context, RecordBatch incoming) throws SchemaChangeException;
+ protected abstract void doPerRecordWork(int inIndex, int outIndex);
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
new file mode 100644
index 0000000..2b4ac81
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/TransferPairing.java
@@ -0,0 +1,35 @@
+package org.apache.drill.exec.physical.impl.project;
+
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class TransferPairing<T extends ValueVector<T>> {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransferPairing.class);
+
+ final T from;
+ final T to;
+
+ protected TransferPairing(T from, T to) {
+ super();
+ this.from = from;
+ this.to = to;
+ }
+
+ public void transfer(){
+ from.transferTo(to);
+ }
+
+ public static <T extends ValueVector<T>> TransferPairing<T> getTransferPairing(T from, T to){
+ return new TransferPairing<T>(from, to);
+ }
+
+ public T getFrom() {
+ return from;
+ }
+
+ public T getTo() {
+ return to;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 3c2df61..1148c93 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.coord.DrillbitEndpointSerDe;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentLeaf;
@@ -28,7 +29,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
import org.apache.drill.exec.record.MajorTypeSerDe;
import com.fasterxml.jackson.core.JsonProcessingException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index b26e742..bb07e56 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,13 +23,17 @@ import java.util.List;
public class BatchSchema implements Iterable<MaterializedField> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
-
+ final SelectionVectorMode selectionVector;
+ ;
private final List<MaterializedField> fields;
- final boolean hasSelectionVector;
- BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
+ BatchSchema(SelectionVectorMode selectionVector, List<MaterializedField> fields) {
this.fields = fields;
- this.hasSelectionVector = hasSelectionVector;
+ this.selectionVector = selectionVector;
+ }
+
+ public static SchemaBuilder newBuilder() {
+ return new SchemaBuilder();
}
@Override
@@ -37,16 +41,24 @@ public class BatchSchema implements Iterable<MaterializedField> {
return fields.iterator();
}
- public static SchemaBuilder newBuilder() {
- return new SchemaBuilder();
+ public SelectionVectorMode getSelectionVector() {
+ return selectionVector;
}
@Override
public String toString() {
- return "BatchSchema [fields=" + fields + ", hasSelectionVector=" + hasSelectionVector + "]";
+ return "BatchSchema [fields=" + fields + ", selectionVector=" + selectionVector + "]";
}
-
-
-
+ public static enum SelectionVectorMode {
+ NONE(-1, false), TWO_BYTE(2, true), FOUR_BYTE(4, true);
+
+ public boolean hasSelectionVector;
+ public final int size;
+ SelectionVectorMode(int size, boolean hasSelectionVector) {
+ this.size = size;
+ }
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
deleted file mode 100644
index 391aec5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/ExpressionTreeMaterializer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-
-package org.apache.drill.exec.record;
-
-import com.google.common.collect.Lists;
-import org.apache.drill.common.expression.*;
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.expression.visitors.ExprVisitor;
-
-import java.util.List;
-
-public class ExpressionTreeMaterializer {
- public LogicalExpression Materialize(LogicalExpression expr, BatchSchema schema, ErrorCollector errorCollector) {
- return expr.accept(new MaterializeVisitor(schema, errorCollector));
- }
-
- private class MaterializeVisitor implements ExprVisitor<LogicalExpression> {
- private final ErrorCollector errorCollector;
- private final BatchSchema schema;
- private boolean isModified; // Flag to track if children is changed
-
- public MaterializeVisitor(BatchSchema schema, ErrorCollector errorCollector) {
- this.schema = schema;
- this.errorCollector = errorCollector;
- isModified = false;
- }
-
- private LogicalExpression validateNewExpr(LogicalExpression newExpr) {
- StringBuilder stringBuilder = new StringBuilder();
- newExpr.addToString(stringBuilder);
- newExpr.resolveAndValidate(stringBuilder.toString(), errorCollector);
- return newExpr;
- }
-
- @Override
- public LogicalExpression visitFunctionCall(FunctionCall call) {
- List<LogicalExpression> args = Lists.newArrayList(call.iterator());
- boolean hasChanged = false;
- for (int i = 0; i < args.size(); ++i) {
- LogicalExpression newExpr = args.get(i).accept(this);
- if (isModified) {
- hasChanged = true;
- args.set(i, newExpr);
- isModified = false;
- }
- }
-
- if(hasChanged) {
- isModified = true;
- return validateNewExpr(new FunctionCall(call.getDefinition(), args));
- }
-
- return call;
- }
-
- @Override
- public LogicalExpression visitIfExpression(IfExpression ifExpr) {
- List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.iterator());
- boolean hasChanged = false;
- LogicalExpression newElseExpr = null;
- if(ifExpr.elseExpression != null) {
- newElseExpr = ifExpr.elseExpression.accept(this);
- hasChanged = isModified;
- }
-
- isModified = false;
-
- for(int i = 0; i < conditions.size(); ++i) {
- IfExpression.IfCondition condition = conditions.get(i);
-
- LogicalExpression newCondition = condition.condition.accept(this);
- boolean modified = isModified;
- isModified = false;
- LogicalExpression newExpr = condition.expression.accept(this);
- if(modified || isModified) {
- conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
- hasChanged = true;
- isModified = false;
- }
- }
-
- if(hasChanged) {
- isModified = true;
- return validateNewExpr(IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build());
- }
-
- return ifExpr;
- }
-
- @Override
- public LogicalExpression visitSchemaPath(SchemaPath path) {
- for (MaterializedField field : schema) {
- if (field.getType() != DataType.LATEBIND && field.matches(path)) {
- isModified = true;
- return validateNewExpr(new FieldReference(path.getPath().toString(), field.getType()));
- }
- }
-
- return path;
- }
-
- @Override
- public LogicalExpression visitLongExpression(ValueExpressions.LongExpression intExpr) {
- return intExpr;
- }
-
- @Override
- public LogicalExpression visitDoubleExpression(ValueExpressions.DoubleExpression dExpr) {
- return dExpr;
- }
-
- @Override
- public LogicalExpression visitBoolean(ValueExpressions.BooleanExpression e) {
- return e;
- }
-
- @Override
- public LogicalExpression visitQuotedString(ValueExpressions.QuotedString e) {
- return e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
index 718396e..8799546 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MajorTypeSerDe.java
@@ -19,9 +19,9 @@ package org.apache.drill.exec.record;
import java.io.IOException;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
-import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index b692a93..6cf7087 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -18,18 +18,17 @@
package org.apache.drill.exec.record;
import java.util.Iterator;
-import java.util.List;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.RecordField.ValueMode;
-import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.record.vector.TypeHelper;
-public class MaterializedField implements Comparable<MaterializedField> {
+public class MaterializedField{
private final FieldDef def;
public MaterializedField(FieldDef def) {
@@ -40,12 +39,10 @@ public class MaterializedField implements Comparable<MaterializedField> {
return new MaterializedField(def);
}
- public static MaterializedField create(SchemaPath path, int fieldId, int parentId, MajorType type) {
+ public static MaterializedField create(SchemaPath path, MajorType type) {
FieldDef.Builder b = FieldDef.newBuilder();
- b.setFieldId(fieldId);
b.setMajorType(type);
addSchemaPathToFieldDef(path, b);
- b.setParentId(parentId);
return create(b.build());
}
@@ -90,10 +87,6 @@ public class MaterializedField implements Comparable<MaterializedField> {
return def.getMajorType().getWidth();
}
- public int getFieldId() {
- return def.getFieldId();
- }
-
public MajorType getType() {
return def.getMajorType();
}
@@ -120,6 +113,9 @@ public class MaterializedField implements Comparable<MaterializedField> {
throw new UnsupportedOperationException();
}
return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+
+ public Class<?> getValueClass() {
+ return TypeHelper.getValueVectorClass(getType().getMinorType(), getDataMode());
}
public boolean matches(SchemaPath path) {
@@ -141,46 +137,8 @@ public class MaterializedField implements Comparable<MaterializedField> {
}
// we've reviewed all path segments. confirm that we don't have any extra name parts.
return !iter.hasNext();
-
- private void check(String name, Object val1, Object expected) throws SchemaChangeException{
- if(expected.equals(val1)) return;
- throw new SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name, val1, name, expected);
}
- public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
- if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s", this.type, expected.type);
- if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
- check("fieldId", this.fieldId, expected.fieldId);
- check("nullability", this.nullable, expected.nullable);
- check("valueMode", this.mode, expected.mode);
- }
-
- // private void check(String name, Object val1, Object expected) throws SchemaChangeException{
- // if(expected.equals(val1)) return;
- // throw new
- // SchemaChangeException("Expected and actual field definitions don't match. Actual %s: %s, expected %s: %s", name,
- // val1, name, expected);
- // }
-
- // public void checkMaterialization(MaterializedField expected) throws SchemaChangeException{
- // if(this.type == expected.type || expected.type == DataType.LATEBIND) throw new
- // SchemaChangeException("Expected and actual field definitions don't match. Actual DataType: %s, expected DataTypes: %s",
- // this.type, expected.type);
- // if(expected.valueClass != null) check("valueClass", this.valueClass, expected.valueClass);
- // check("fieldId", this.fieldId, expected.fieldId);
- // check("nullability", this.nullable, expected.nullable);
- // check("valueMode", this.mode, expected.mode);
- // }
- //
- // public MaterializedField getNullableVersion(Class<?> valueClass){
- // return new MaterializedField(path, fieldId, type, true, mode, valueClass);
- // }
-
- @Override
- public int compareTo(MaterializedField o) {
- return Integer.compare(this.getFieldId(), o.getFieldId());
- }
-
@Override
public String toString() {
return "MaterializedField [" + def.toString() + "]";
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
new file mode 100644
index 0000000..4ab908f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/NullExpression.java
@@ -0,0 +1,32 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+public class NullExpression implements LogicalExpression{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullExpression.class);
+
+ public static final NullExpression INSTANCE = new NullExpression();
+
+ final MajorType t = MajorType.newBuilder().setMode(DataMode.OPTIONAL).setMinorType(MinorType.NULL).build();
+
+ @Override
+ public MajorType getMajorType() {
+ return t;
+ }
+
+ @Override
+ public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V value) throws E {
+ return visitor.visitUnknown(this, value);
+ }
+
+ @Override
+ public ExpressionPosition getPosition() {
+ return ExpressionPosition.UNKNOWN;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index c6b7888..40447ec 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -17,8 +17,12 @@
******************************************************************************/
package org.apache.drill.exec.record;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.record.vector.SelectionVector2;
+import org.apache.drill.exec.record.vector.SelectionVector4;
+import org.apache.drill.exec.record.vector.ValueVector;
/**
* A record batch contains a set of field values for a particular range of records. In the case of a record batch
@@ -39,6 +43,12 @@ public interface RecordBatch {
NOT_YET // used by batches that haven't received incoming data yet.
}
+ public static enum SetupOutcome {
+ OK,
+ OK_NEW_SCHEMA,
+ FAILED
+ }
+
/**
* Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
* level information.
@@ -67,12 +77,18 @@ public interface RecordBatch {
*/
public void kill();
-
- public abstract <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor;
-
-// public abstract void getDictReader(int fieldId, Class<?> clazz) throws InvalidValueAccessor;
-//
-// public abstract void getRleReader(int fieldId, Class<?> clazz) throws InvalidValueAccessor;
+ public abstract SelectionVector2 getSelectionVector2();
+ public abstract SelectionVector4 getSelectionVector4();
+
+ /**
+ * Get the value vector
+ * @param path The path where the vector should be located.
+ * @return The local field id associated with this vector.
+ */
+ public abstract TypedFieldId getValueVector(SchemaPath path);
+
+
+ public abstract <T extends ValueVector<T>> T getValueVectorById(int fieldId, Class<?> vvClass);
/**
* Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -89,4 +105,41 @@ public interface RecordBatch {
*/
public WritableBatch getWritableBatch();
+ public static class TypedFieldId{
+ final MajorType type;
+ final int fieldId;
+ public TypedFieldId(MajorType type, int fieldId) {
+ super();
+ this.type = type;
+ this.fieldId = fieldId;
+ }
+ public MajorType getType() {
+ return type;
+ }
+ public int getFieldId() {
+ return fieldId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TypedFieldId other = (TypedFieldId) obj;
+ if (fieldId != other.fieldId)
+ return false;
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
+ return false;
+ return true;
+ }
+
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index a2dbd81..c3db9f0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -21,22 +21,28 @@ import io.netty.buffer.ByteBuf;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import com.google.common.collect.ImmutableList;
-public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>{
+public class RecordBatchLoader implements Iterable<ValueVector<?>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
- private IntObjectOpenHashMap<ValueVector> vectors = new IntObjectOpenHashMap<ValueVector>();
+ private List<ValueVector<?>> vectors = Lists.newArrayList();
private final BufferAllocator allocator;
private int recordCount;
private BatchSchema schema;
@@ -60,64 +66,72 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
// logger.debug("Loading record batch with def {} and data {}", def, buf);
this.recordCount = def.getRecordCount();
boolean schemaChanged = false;
+
+ Map<MaterializedField, ValueVector<?>> oldFields = Maps.newHashMap();
+ for(ValueVector<?> v : this.vectors){
+ oldFields.put(v.getField(), v);
+ }
- IntObjectOpenHashMap<ValueVector> newVectors = new IntObjectOpenHashMap<ValueVector>();
+ List<ValueVector<?>> newVectors = Lists.newArrayList();
List<FieldMetadata> fields = def.getFieldList();
int bufOffset = 0;
for (FieldMetadata fmd : fields) {
FieldDef fieldDef = fmd.getDef();
- ValueVector v = vectors.remove(fieldDef.getFieldId());
- if (v != null) {
- if (v.getField().getDef().equals(fieldDef)) {
- ValueVector.Mutator m = v.getMutator();
- v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
- newVectors.put(fieldDef.getFieldId(), v);
- continue;
- } else {
- v.close();
- v = null;
- }
+ ValueVector<?> v = oldFields.remove(fieldDef);
+ if(v != null){
+ newVectors.add(v);
+ continue;
}
- // if we arrive here, either the metadata didn't match, or we didn't find a vector.
+
+ // if we arrive here, we didn't have a matching vector.
schemaChanged = true;
MaterializedField m = new MaterializedField(fieldDef);
v = TypeHelper.getNewVector(m, allocator);
v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
newVectors.put(fieldDef.getFieldId(), v);
+ v.setTo(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ newVectors.add(v);
}
- if(!vectors.isEmpty()){
+ if(!oldFields.isEmpty()){
schemaChanged = true;
- for(IntObjectCursor<ValueVector> cursor : newVectors){
- cursor.value.close();
+ for(ValueVector<?> v : oldFields.values()){
+ v.close();
}
-
}
- if(schemaChanged){
- // rebuild the schema.
- SchemaBuilder b = BatchSchema.newBuilder();
- for(IntObjectCursor<ValueVector> cursor : newVectors){
- b.addField(cursor.value.getField());
- }
- b.setSelectionVector(false);
- this.schema = b.build();
+ // rebuild the schema.
+ SchemaBuilder b = BatchSchema.newBuilder();
+ for(ValueVector<?> v : newVectors){
+ b.addField(v.getField());
}
- vectors = newVectors;
+ b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+ this.schema = b.build();
+ vectors = ImmutableList.copyOf(newVectors);
return schemaChanged;
}
+ public TypedFieldId getValueVector(SchemaPath path) {
+ for(int i =0; i < vectors.size(); i++){
+ ValueVector<?> vv = vectors.get(i);
+ if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i);
+ }
+ return null;
+ }
+
@SuppressWarnings("unchecked")
- public <T extends ValueVector> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
- ValueVector v = vectors.get(fieldId);
+ public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<?> clazz) {
+ ValueVector<?> v = vectors.get(fieldId);
assert v != null;
- if (v.getClass() != clazz)
- throw new InvalidValueAccessor(String.format(
+ if (v.getClass() != clazz){
+ logger.warn(String.format(
"Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+ return null;
+ }
return (T) v;
}
@@ -131,7 +145,7 @@ public class RecordBatchLoader implements Iterable<IntObjectCursor<ValueVector>>
}
@Override
- public Iterator<IntObjectCursor<ValueVector>> iterator() {
+ public Iterator<ValueVector<?>> iterator() {
return this.vectors.iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index 1e25b1a..20cc82c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -17,15 +17,12 @@
******************************************************************************/
package org.apache.drill.exec.record;
-import java.util.Collections;
import java.util.List;
+import java.util.Set;
-import org.apache.drill.common.expression.types.DataType;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.RecordField.ValueMode;
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Lists;
/**
@@ -33,40 +30,39 @@ import com.google.common.collect.Lists;
* builder will always check that this schema is a equal or more materialized version of the current schema.
*/
public class SchemaBuilder {
- private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
- private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+ private Set<MaterializedField> fields = Sets.newHashSet();
- private boolean hasSelectionVector;
-
- public SchemaBuilder(BatchSchema expected) {
- for (MaterializedField f : expected) {
- expectedFields.put(f.getFieldId(), f);
- }
- hasSelectionVector = expected.hasSelectionVector;
- }
+ private BatchSchema.SelectionVectorMode selectionVectorMode;
SchemaBuilder() {
}
- /**
- * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
- * will be set to null.
- *
- * @param fieldId
- * The desired fieldId. Should be unique for this BatchSchema.
- * @param nullable
- * Whether this field supports nullability.
- * @param mode
- * @throws SchemaChangeException
- */
+// /**
+// * Add a field where we don't have type information. In this case, DataType will be set to LATEBIND and valueClass
+// * will be set to null.
+// *
+// * @param fieldId
+// * The desired fieldId. Should be unique for this BatchSchema.
+// * @param nullable
+// * Whether this field supports nullability.
+// * @param mode
+// * @throws SchemaChangeException
+// */
// public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException {
// addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
// }
- public void setSelectionVector(boolean hasSelectionVector) {
- this.hasSelectionVector = hasSelectionVector;
+ public SchemaBuilder setSelectionVectorMode(BatchSchema.SelectionVectorMode selectionVectorMode) {
+ this.selectionVectorMode = selectionVectorMode;
+ return this;
}
+ public SchemaBuilder addFields(Iterable<MaterializedField> fields){
+ for(MaterializedField f : fields){
+ addField(f);
+ }
+ return this;
+ }
// private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
// throws SchemaChangeException {
@@ -83,8 +79,9 @@ public class SchemaBuilder {
// fields.put(f.getFieldId(), f);
// }
- public void addField(MaterializedField f){
- fields.put(f.getFieldId(), f);
+ public SchemaBuilder addField(MaterializedField f){
+ fields.add(f);
+ return this;
}
// public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
@@ -104,9 +101,9 @@ public class SchemaBuilder {
// setTypedField(fieldId, type, nullable, mode, valueClass);
// }
- public void removeField(short fieldId) throws SchemaChangeException{
- MaterializedField f = fields.remove(fieldId);
- if(f == null) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+ public SchemaBuilder removeField(MaterializedField f) throws SchemaChangeException{
+ if(!fields.remove(f)) throw new SchemaChangeException("You attempted to remove an nonexistent field.");
+ return this;
}
/**
@@ -114,14 +111,8 @@ public class SchemaBuilder {
* @return
* @throws SchemaChangeException
*/
- public BatchSchema build() throws SchemaChangeException {
- // check if any fields are unaccounted for.
-
- List<MaterializedField> fieldList = Lists.newArrayList();
- for (ObjectCursor<MaterializedField> f : fields.values()) {
- if (f != null) fieldList.add(f.value);
- }
- Collections.sort(fieldList);
- return new BatchSchema(this.hasSelectionVector, fieldList);
+ public BatchSchema build(){
+ List<MaterializedField> fieldList = Lists.newArrayList(fields);
+ return new BatchSchema(this.selectionVectorMode, fieldList);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 4b97768..2e1754c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -57,50 +57,27 @@ public class WritableBatch {
public ByteBuf[] getBuffers(){
return buffers;
}
-
-// public static WritableBatch get(ValueVector[] vectors){
-// WritableCreator c = new WritableCreator();
-// for(int i =0; i < vectors.length; i++){
-// c.apply(i, vectors[i]);
-// }
-// return c.get();
-// }
-//
-
- public static WritableBatch get(int recordCount, IntObjectOpenHashMap<ValueVector> fields){
- WritableCreator creator = new WritableCreator(recordCount);
- fields.forEach(creator);
- return creator.get();
-
- }
-
- private static class WritableCreator implements IntObjectProcedure<ValueVector>{
+
+ public static WritableBatch get(int recordCount, List<ValueVector<?>> vectors){
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
- private int recordCount;
- public WritableCreator(int recordCount) {
- super();
- this.recordCount = recordCount;
- }
-
- @Override
- public void apply(int key, ValueVector value) {
- metadata.add(value.getMetadata());
- for(ByteBuf b : value.getBuffers()){
+ for(ValueVector<?> vv : vectors){
+ metadata.add(vv.getMetadata());
+ for(ByteBuf b : vv.getBuffers()){
buffers.add(b);
b.retain();
}
- value.clear();
+ // allocate new buffer to release hold on old buffer.
+ vv.allocateNew(vv.capacity());
}
- public WritableBatch get(){
- RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
- WritableBatch b = new WritableBatch(batchDef, buffers);
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+ WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
- }
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
new file mode 100644
index 0000000..3fc39eb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed8.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public final class NullableFixed8 extends NullableValueVector<NullableFixed8, Fixed8>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableFixed8.class);
+
+ public NullableFixed8(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ }
+
+ @Override
+ protected Fixed8 getNewValueVector(BufferAllocator allocator) {
+ return new Fixed8(this.field, allocator);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
index 02b75ce..3546bd8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
@@ -37,5 +37,13 @@ public class SelectionVector {
public void allocateNew(int count) {
}
+<<<<<<< HEAD:sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+=======
+
+ public final int getInt(int index){
+ index*=4;
+ return data.getInt(index);
+ }
+>>>>>>> Build working:sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
new file mode 100644
index 0000000..c314cd4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector2.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.DeadBuf;
+
+/**
+ * A selection vector that fronts, at most, a
+ */
+public class SelectionVector2{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
+
+ private final BufferAllocator allocator;
+ private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+
+ public SelectionVector2(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public int getCount(){
+ return -1;
+ }
+
+ public int getIndex(int directIndex){
+ return buffer.getChar(directIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
new file mode 100644
index 0000000..d857146
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector4.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.DeadBuf;
+
+public class SelectionVector4 {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
+
+ private final BufferAllocator allocator;
+ private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+
+ public SelectionVector4(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public int getCount(){
+ return -1;
+ }
+
+
+}