You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:58:01 UTC
[40/53] [abbrv] Types transition
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
index eef0634..85bbdf3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
@@ -18,49 +18,37 @@
package org.apache.drill.exec.schema;
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.vector.*;
-import org.apache.drill.exec.store.BatchExceededException;
-import org.apache.drill.exec.store.VectorHolder;
-import java.nio.charset.Charset;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.drill.exec.proto.SchemaDefProtos.*;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
public abstract class Field {
final MajorType fieldType;
- final int parentFieldId;
- final int fieldId;
final String prefixFieldName;
RecordSchema schema;
RecordSchema parentSchema;
boolean read;
- public Field(RecordSchema parentSchema, int parentFieldId, IdGenerator<Integer> generator, MajorType type, String prefixFieldName) {
- this.fieldId = generator.getNextId();
+ public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) {
fieldType = type;
this.prefixFieldName = prefixFieldName;
this.parentSchema = parentSchema;
- this.parentFieldId = parentFieldId;
}
+ public MaterializedField getAsMaterializedField(){
+ return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType);
+ }
+
public abstract String getFieldName();
public String getFullFieldName() {
return Strings.isNullOrEmpty(prefixFieldName) ? getFieldName() : prefixFieldName + "." + getFieldName();
}
- public int getFieldId() {
- return fieldId;
- }
-
public void setRead(boolean read) {
this.read = read;
}
@@ -69,7 +57,6 @@ public abstract class Field {
Objects.ToStringHelper getAttributesStringHelper() {
return Objects.toStringHelper(this).add("type", fieldType)
- .add("id", fieldId)
.add("fullFieldName", getFullFieldName())
.add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
index e5bd1a4..77f1f37 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
@@ -18,15 +18,16 @@
package org.apache.drill.exec.schema;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.drill.exec.proto.SchemaDefProtos;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import java.util.List;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
public class ListSchema implements RecordSchema {
private List<Field> fields;
@@ -37,7 +38,7 @@ public class ListSchema implements RecordSchema {
@Override
public void addField(Field field) {
- if (field.getFieldType().getMode() == SchemaDefProtos.DataMode.REPEATED || fields.isEmpty() || !isSingleTyped() ||
+ if (field.getFieldType().getMode() == DataMode.REPEATED || fields.isEmpty() || !isSingleTyped() ||
!Iterables.getOnlyElement(fields).equals(field.getFieldType())) {
fields.add(field);
}
@@ -58,7 +59,7 @@ public class ListSchema implements RecordSchema {
@Override
public void removeField(Field field, int index) {
checkArgument(fields.size() > index);
- checkArgument(checkNotNull(fields.get(index)).getFieldId() == field.getFieldId());
+// checkArgument(checkNotNull(fields.get(index)).getFieldId() == field.getFieldId());
fields.remove(index);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
index b975ad7..556628c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
@@ -18,26 +18,25 @@
package org.apache.drill.exec.schema;
-import com.google.common.base.Objects;
-import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import com.google.common.base.Objects;
+
public class NamedField extends Field {
- final SchemaDefProtos.MajorType keyType;
+ final MajorType keyType;
String fieldName;
- public NamedField(RecordSchema parentSchema, int parentFieldId, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, SchemaDefProtos.MajorType fieldType) {
- this(parentSchema, parentFieldId, generator, prefixFieldName, fieldName, fieldType, JacksonHelper.STRING_TYPE);
+ public NamedField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType) {
+ this(parentSchema, prefixFieldName, fieldName, fieldType, JacksonHelper.STRING_TYPE);
}
public NamedField(RecordSchema parentSchema,
- int parentFieldId,
- IdGenerator<Integer> generator,
String prefixFieldName,
String fieldName,
- SchemaDefProtos.MajorType fieldType,
- SchemaDefProtos.MajorType keyType) {
- super(parentSchema, parentFieldId, generator, fieldType, prefixFieldName);
+ MajorType fieldType,
+ MajorType keyType) {
+ super(parentSchema, fieldType, prefixFieldName);
this.fieldName = fieldName;
this.keyType = keyType;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
index 5f514af..eec1e4b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
@@ -18,19 +18,18 @@
package org.apache.drill.exec.schema;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
import com.google.common.base.Objects;
-import org.apache.drill.exec.proto.SchemaDefProtos;
public class OrderedField extends Field {
private final int index;
public OrderedField(RecordSchema parentSchema,
- int parentFieldId,
- IdGenerator<Integer> generator,
- SchemaDefProtos.MajorType type,
+ MajorType type,
String prefixFieldName,
int index) {
- super(parentSchema, parentFieldId, generator, type, prefixFieldName);
+ super(parentSchema, type, prefixFieldName);
this.index = index;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
index 64a9d58..d0b27fc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -18,24 +18,24 @@
package org.apache.drill.exec.schema.json.jackson;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import org.apache.drill.exec.proto.SchemaDefProtos;
-
import java.io.IOException;
-import static org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
-import static org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
public class JacksonHelper {
- public static final MajorType STRING_TYPE = MajorType.newBuilder().setMinorType(MinorType.VARCHAR4).setMode(SchemaDefProtos.DataMode.OPTIONAL).build();
- public static final MajorType BOOLEAN_TYPE = MajorType.newBuilder().setMinorType(MinorType.BOOLEAN).setMode(SchemaDefProtos.DataMode.OPTIONAL).build();
- public static final MajorType ARRAY_TYPE = MajorType.newBuilder().setMinorType(MinorType.LATE).setMode(SchemaDefProtos.DataMode.REPEATED).build();
- public static final MajorType MAP_TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(SchemaDefProtos.DataMode.REPEATED).build();
- public static final MajorType INT_TYPE = MajorType.newBuilder().setMinorType(MinorType.INT).setMode(SchemaDefProtos.DataMode.OPTIONAL).build();
- public static final MajorType FLOAT_TYPE = MajorType.newBuilder().setMinorType(MinorType.FLOAT4).setMode(SchemaDefProtos.DataMode.OPTIONAL).build();
- public static final MajorType NULL_TYPE = MajorType.newBuilder().setMinorType(MinorType.LATE).setMode(SchemaDefProtos.DataMode.OPTIONAL).build();
+ public static final MajorType STRING_TYPE = MajorType.newBuilder().setMinorType(MinorType.VARCHAR4).setMode(DataMode.OPTIONAL).build();
+ public static final MajorType BOOLEAN_TYPE = MajorType.newBuilder().setMinorType(MinorType.BOOLEAN).setMode(DataMode.OPTIONAL).build();
+ public static final MajorType ARRAY_TYPE = MajorType.newBuilder().setMinorType(MinorType.LATE).setMode(DataMode.REPEATED).build();
+ public static final MajorType MAP_TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
+ public static final MajorType INT_TYPE = MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build();
+ public static final MajorType FLOAT_TYPE = MajorType.newBuilder().setMinorType(MinorType.FLOAT4).setMode(DataMode.OPTIONAL).build();
+ public static final MajorType NULL_TYPE = MajorType.newBuilder().setMinorType(MinorType.LATE).setMode(DataMode.OPTIONAL).build();
public static MajorType getFieldType(JsonToken token) {
switch(token) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
index 07ae20a..8c31aa4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java
@@ -9,16 +9,24 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.List;
+import java.util.Map;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.SchemaDefProtos;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.vector.NullableBit;
+import org.apache.drill.exec.record.vector.NullableFixed4;
+import org.apache.drill.exec.record.vector.NullableVarLen4;
+import org.apache.drill.exec.record.vector.TypeHelper;
+import org.apache.drill.exec.record.vector.ValueVector;
import org.apache.drill.exec.schema.DiffSchema;
import org.apache.drill.exec.schema.Field;
import org.apache.drill.exec.schema.IdGenerator;
@@ -29,13 +37,8 @@ import org.apache.drill.exec.schema.OrderedField;
import org.apache.drill.exec.schema.RecordSchema;
import org.apache.drill.exec.schema.SchemaIdGenerator;
import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarChar4Vector;
-import org.apache.drill.exec.vector.TypeHelper;
-import org.apache.drill.exec.vector.ValueVector;
+import com.beust.jcommander.internal.Maps;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.fasterxml.jackson.core.JsonFactory;
@@ -55,7 +58,7 @@ public class JSONRecordReader implements RecordReader {
private final String inputPath;
- private final IntObjectOpenHashMap<VectorHolder> valueVectorMap;
+ private final Map<Field, VectorHolder> valueVectorMap;
private JsonParser parser;
private SchemaIdGenerator generator;
@@ -70,7 +73,7 @@ public class JSONRecordReader implements RecordReader {
this.inputPath = inputPath;
this.allocator = fragmentContext.getAllocator();
this.batchSize = batchSize;
- valueVectorMap = new IntObjectOpenHashMap<>();
+ valueVectorMap = Maps.newHashMap();
}
public JSONRecordReader(FragmentContext fragmentContext, String inputPath) {
@@ -134,7 +137,7 @@ public class JSONRecordReader implements RecordReader {
// Garbage collect fields never referenced in this batch
for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
diffSchema.addRemovedField(field);
- outputMutator.removeField(field.getFieldId());
+ outputMutator.removeField(field.getAsMaterializedField());
}
} catch (IOException | SchemaChangeException e) {
@@ -144,8 +147,8 @@ public class JSONRecordReader implements RecordReader {
}
private void resetBatch() {
- for (ObjectCursor<VectorHolder> holder : valueVectorMap.values()) {
- holder.value.reset();
+ for (VectorHolder value : valueVectorMap.values()) {
+ value.reset();
}
currentSchema.resetMarkedFields();
@@ -162,9 +165,6 @@ public class JSONRecordReader implements RecordReader {
}
}
- private SchemaIdGenerator getGenerator() {
- return generator;
- }
private RecordSchema getCurrentSchema() {
return currentSchema;
@@ -193,8 +193,8 @@ public class JSONRecordReader implements RecordReader {
public static enum ReadType {
ARRAY(END_ARRAY) {
@Override
- public Field createField(RecordSchema parentSchema, int parentFieldId, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, SchemaDefProtos.MajorType fieldType, int index) {
- return new OrderedField(parentSchema, parentFieldId, generator, fieldType, prefixFieldName, index);
+ public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+ return new OrderedField(parentSchema, fieldType, prefixFieldName, index);
}
@Override
@@ -205,13 +205,11 @@ public class JSONRecordReader implements RecordReader {
OBJECT(END_OBJECT) {
@Override
public Field createField(RecordSchema parentSchema,
- int parentFieldId,
- IdGenerator<Integer> generator,
String prefixFieldName,
String fieldName,
- SchemaDefProtos.MajorType fieldType,
+ MajorType fieldType,
int index) {
- return new NamedField(parentSchema, parentFieldId, generator, prefixFieldName, fieldName, fieldType);
+ return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
}
@Override
@@ -246,7 +244,7 @@ public class JSONRecordReader implements RecordReader {
}
String fieldName = parser.getCurrentName();
- SchemaDefProtos.MajorType fieldType = JacksonHelper.getFieldType(token);
+ MajorType fieldType = JacksonHelper.getFieldType(token);
ReadType readType = null;
switch (token) {
case START_ARRAY:
@@ -289,7 +287,7 @@ public class JSONRecordReader implements RecordReader {
private boolean recordData(Field parentField,
JSONRecordReader.ReadType readType,
JSONRecordReader reader,
- SchemaDefProtos.MajorType fieldType,
+ MajorType fieldType,
String prefixFieldName,
String fieldName,
int rowIndex,
@@ -298,7 +296,6 @@ public class JSONRecordReader implements RecordReader {
Field field = currentSchema.getField(fieldName, colIndex);
boolean isFieldFound = field != null;
List<Field> removedFields = reader.getRemovedFields();
- int parentFieldId = parentField == null ? 0 : parentField.getFieldId();
if (!isFieldFound || !field.getFieldType().equals(fieldType)) {
if (isFieldFound) {
if (field.hasSchema()) {
@@ -310,8 +307,6 @@ public class JSONRecordReader implements RecordReader {
field = createField(
currentSchema,
- parentFieldId,
- reader.getGenerator(),
prefixFieldName,
fieldName,
fieldType,
@@ -324,7 +319,7 @@ public class JSONRecordReader implements RecordReader {
field.setRead(true);
- VectorHolder holder = getOrCreateVectorHolder(reader, field, parentFieldId);
+ VectorHolder holder = getOrCreateVectorHolder(reader, field);
if (readType != null) {
RecordSchema fieldSchema = field.getAssignedSchema();
reader.setCurrentSchema(fieldSchema);
@@ -352,7 +347,7 @@ public class JSONRecordReader implements RecordReader {
return true;
}
- private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, SchemaDefProtos.MinorType minorType) {
+ private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType) {
switch (minorType) {
case INT: {
holder.incAndCheckLength(32);
@@ -398,18 +393,16 @@ public class JSONRecordReader implements RecordReader {
}
}
- private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field, int parentFieldId) throws SchemaChangeException {
- return reader.getOrCreateVectorHolder(field, parentFieldId);
+ private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+ return reader.getOrCreateVectorHolder(field);
}
public abstract RecordSchema createSchema() throws IOException;
public abstract Field createField(RecordSchema parentSchema,
- int parentFieldId,
- IdGenerator<Integer> generator,
String prefixFieldName,
String fieldName,
- SchemaDefProtos.MajorType fieldType,
+ MajorType fieldType,
int index);
}
@@ -417,20 +410,19 @@ public class JSONRecordReader implements RecordReader {
diffSchema.recordNewField(field);
}
- private VectorHolder getOrCreateVectorHolder(Field field, int parentFieldId) throws SchemaChangeException {
- if (!valueVectorMap.containsKey(field.getFieldId())) {
- SchemaDefProtos.MajorType type = field.getFieldType();
- int fieldId = field.getFieldId();
- MaterializedField f = MaterializedField.create(new SchemaPath(field.getFieldName()), fieldId, parentFieldId, type);
-
- ValueVector v = TypeHelper.getNewVector(f, allocator);
- VectorHolder holder = new VectorHolder(batchSize, v);
- holder.allocateNew(batchSize);
-
- valueVectorMap.put(fieldId, holder);
- outputMutator.addField(fieldId, v);
+ private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+ VectorHolder holder = valueVectorMap.get(field);
+
+ if (holder == null) {
+ MajorType type = field.getFieldType();
+ MaterializedField f = MaterializedField.create(new SchemaPath(field.getFieldName(), ExpressionPosition.UNKNOWN), type);
+ ValueVector<?> v = TypeHelper.getNewVector(f, allocator);
+ v.allocateNew(batchSize);
+ holder = new VectorHolder(batchSize, v);
+ valueVectorMap.put(field, holder);
+ outputMutator.addField(v);
return holder;
}
- return valueVectorMap.lget();
+ return holder;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index edda714..2829dfd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
@@ -113,7 +114,7 @@ public class BitComHandlerImpl implements BitComHandler {
@Override
public void startNewRemoteFragment(PlanFragment fragment){
logger.debug("Received remote fragment start instruction", fragment);
- FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null);
+ FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
RemotingFragmentRunnerListener listener = new RemotingFragmentRunnerListener(context, tunnel);
try{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 5bb1ff2..1a4bc6c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
@@ -74,7 +75,7 @@ class RunningFragmentManager implements FragmentStatusListener{
{
IncomingBuffers buffers = new IncomingBuffers(rootOperator);
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers);
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment.getHandle(), rootClient, buffers, new FunctionImplementationRegistry(bee.getContext().getConfig()));
RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
// add fragment to local node.
map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index 4a5dbf2..e4d0cfc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentLeaf;
import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -56,7 +57,7 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
this.fragment = fragment;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
this.buffers = new IncomingBuffers(root);
- this.context = new FragmentContext(context, fragment.getHandle(), null, buffers);
+ this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
this.runnerListener = new RemotingFragmentRunnerListener(this.context, foremanTunnel);
this.reader = context.getPlanReader();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
index de0009a..a360cea 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
@@ -4,61 +4,8 @@ option java_package = "org.apache.drill.exec.proto";
option java_outer_classname = "SchemaDefProtos";
option optimize_for = SPEED;
+import "Types.proto";
-// Schema Definitions //
-enum MinorType {
- LATE = 0; // late binding type
- MAP = 1; // an empty map column. Useful for conceptual setup. Children listed within here
- REPEATMAP = 2; // a repeated map column (means that multiple children sit below this)
- TINYINT = 3; // single byte signed integer
- SMALLINT = 4; // two byte signed integer
- INT = 5; // four byte signed integer
- BIGINT = 6; // eight byte signed integer
- DECIMAL4 = 7; // a decimal supporting precision between 1 and 8 (4 bits for decimal location, 1 sign)
- DECIMAL8 = 8; // a decimal supporting precision between 9 and 18 (5 bits for decimal location, 1 sign)
- DECIMAL12 = 9; // a decimal supporting precision between19 and 28 (5 bits for decimal location, 1 sign)
- DECIMAL16 = 10; // a decimal supporting precision between 29 and 37 (6 bits for decimal location, 1 sign)
- MONEY = 11; // signed decimal with two digit precision
- DATE = 12; // days since 4713bc
- TIME = 13; // time in micros before or after 2000/1/1
- TIMETZ = 14; // time in micros before or after 2000/1/1 with timezone
- TIMESTAMP = 15; // unix epoch time in millis
- DATETIME = 16; // TBD
- INTERVAL = 17; // TBD
- FLOAT4 = 18; // 4 byte ieee 754
- FLOAT8 = 19; // 8 byte ieee 754
- BOOLEAN = 20; // single bit value
- FIXEDCHAR = 21; // utf8 fixed length string, padded with spaces
- VARCHAR1 = 22; // utf8 variable length string (up to 2^8 in length)
- VARCHAR2 = 23; // utf8 variable length string (up to 2^16 in length)
- VARCHAR4 = 24; // utf8 variable length string (up to 2^32 in length)
- FIXEDBINARY = 25; // fixed length binary, padded with 0 bytes
- VARBINARY1 = 26; // variable length binary (up to 2^8 in length)
- VARBINARY2 = 27; // variable length binary (up to 2^16 in length)
- VARBINARY4 = 28; // variable length binary (up to 2^32 in length)
- UINT1 = 29; // unsigned 1 byte integer
- UINT2 = 30; // unsigned 2 byte integer
- UINT4 = 31; // unsigned 4 byte integer
- UINT8 = 32; // unsigned 8 byte integer
- PROTO2 = 33; // protobuf encoded complex type. (up to 2^16 in length)
- PROTO4 = 34; // protobuf encoded complex type. (up to 2^32 in length)
- MSGPACK2 = 35; // msgpack encoded complex type. (up to 2^16 in length)
- MSGPACK4 = 36; // msgpack encoded complex type. (up to 2^32 in length)
-}
-
-message MajorType {
- optional MinorType minor_type = 1;
- optional DataMode mode = 2;
- optional int32 width = 3; // optional width for fixed size values.
- optional int32 precision = 4; // used for decimal types
- optional int32 scale = 5; // used for decimal types
-}
-
-enum DataMode {
- OPTIONAL = 0; // nullable
- REQUIRED = 1; // non-nullable
- REPEATED = 2; // single, repeated-field
-}
enum ValueMode {
VALUE_VECTOR = 0;
@@ -77,10 +24,8 @@ message NamePart {
}
message FieldDef {
- optional int32 field_id = 1;
- optional int32 parent_id = 2; // the field_id of the parent of this field. populated when this is a repeated field. a field_id of 0 means that the record is the parent of this repeated field.
- repeated NamePart name = 3; // multipart description of entire field name
- optional MajorType major_type = 4; // the type associated with this field.
- repeated FieldDef field = 5; // only in the cases of type == MAP or REPEATMAP
+ repeated NamePart name = 1; // multipart description of entire field name
+ optional common.MajorType major_type = 2; // the type associated with this field.
+ repeated FieldDef field = 3; // only in the cases of type == MAP or REPEATMAP
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index ad18d6e..3ce903d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -16,12 +16,13 @@ drill.exec: {
root: "/drill",
refresh: 500,
timeout: 1000,
- retry: {
- count: 7200,
- delay: 500
- }
- }
-
+ retry: {
+ count: 7200,
+ delay: 500
+ }
+ },
+ functions: ["org.apache.drill.expr.fn.impl"],
+
network: {
start: 35000
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/SortTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/SortTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/SortTest.java
new file mode 100644
index 0000000..3e38a0e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/SortTest.java
@@ -0,0 +1,61 @@
+package org.apache.drill.exec;
+
+import java.util.Random;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public class SortTest {
+ private static final int RECORD_COUNT = 10*1000*1000;
+ private static final int KEY_SIZE = 10;
+ private static final int DATA_SIZE = 90;
+ private static final int RECORD_SIZE = KEY_SIZE + DATA_SIZE;
+
+ private byte[] data;
+
+ public static void main(String[] args) throws Exception{
+ for(int i =0; i < 100; i++){
+ SortTest st = new SortTest();
+ long nanos = st.doSort();
+ System.out.print("Sort Completed in ");
+ System.out.print(nanos);
+ System.out.println(" ns.");
+ }
+ }
+
+ SortTest(){
+ System.out.print("Generating data... ");
+ data = new byte[RECORD_SIZE*RECORD_COUNT];
+ Random r = new Random();
+ r.nextBytes(data);
+ System.out.print("Data generated. ");
+ }
+
+ public long doSort(){
+ QuickSort qs = new QuickSort();
+ ByteSortable b = new ByteSortable();
+ long nano = System.nanoTime();
+ qs.sort(b, 0, RECORD_COUNT);
+ return System.nanoTime() - nano;
+ }
+
+ private class ByteSortable implements IndexedSortable{
+ final byte[] space = new byte[RECORD_SIZE];
+ final BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+
+ @Override
+ public int compare(int index1, int index2) {
+ return comparator.compare(data, index1*RECORD_SIZE, KEY_SIZE, data, index2*RECORD_SIZE, KEY_SIZE);
+ }
+
+ @Override
+ public void swap(int index1, int index2) {
+ int start1 = index1*RECORD_SIZE;
+ int start2 = index2*RECORD_SIZE;
+ System.arraycopy(data, start1, space, 0, RECORD_SIZE);
+ System.arraycopy(data, start2, data, start1, RECORD_SIZE);
+ System.arraycopy(space, 0, data, start2, RECORD_SIZE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
new file mode 100644
index 0000000..16f7802
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -0,0 +1,108 @@
+package org.apache.drill.exec.expr;
+
+import static org.junit.Assert.assertEquals;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionRegistry;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.expression.parser.ExprParser.parse_return;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.vector.Fixed4;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.ILoggerFactory;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.LoggerContext;
+
+public class ExpressionTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTest.class);
+
+ @Test
+ public void testBasicExpression(@Injectable RecordBatch batch) throws Exception {
+ System.out.println(getExpressionCode("if(true) then 1 else 0 end", batch));
+ }
+
+ @Test
+ public void testSpecial(final @Injectable RecordBatch batch) throws Exception {
+ final TypedFieldId tfid = new TypedFieldId(MajorType.newBuilder().setMode(DataMode.OPTIONAL)
+ .setMinorType(MinorType.INT).build(), 0);
+
+ new NonStrictExpectations() {
+ {
+ batch.getValueVector(new SchemaPath("alpha", ExpressionPosition.UNKNOWN));
+ result = tfid;
+ batch.getValueVectorById(tfid.getFieldId(), Fixed4.class);
+ result = new Fixed4(null, null);
+ }
+
+ };
+ System.out.println(getExpressionCode("1 + 1", batch));
+ }
+
+ @Test
+ public void testSchemaExpression(final @Injectable RecordBatch batch) throws Exception {
+ final TypedFieldId tfid = new TypedFieldId(MajorType.newBuilder().setMode(DataMode.OPTIONAL)
+ .setMinorType(MinorType.BIGINT).build(), 0);
+
+ new Expectations() {
+ {
+ batch.getValueVector(new SchemaPath("alpha", ExpressionPosition.UNKNOWN));
+ result = tfid;
+ // batch.getValueVectorById(tfid); result = new Fixed4(null, null);
+ }
+
+ };
+ System.out.println(getExpressionCode("1 + alpha", batch));
+
+ }
+
+ // HELPER METHODS //
+
+ private LogicalExpression parseExpr(String expr) throws RecognitionException {
+ ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+ CommonTokenStream tokens = new CommonTokenStream(lexer);
+ ExprParser parser = new ExprParser(tokens);
+ parser.setRegistry(new FunctionRegistry(DrillConfig.create()));
+ parse_return ret = parser.parse();
+ return ret.e;
+ }
+
+ private String getExpressionCode(String expression, RecordBatch batch) throws Exception {
+ LogicalExpression expr = parseExpr(expression);
+ ErrorCollector error = new ErrorCollectorImpl();
+ LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, batch, error);
+ if (error.getErrorCount() != 0) {
+ logger.error("Failure while materializing expression [{}]. Errors: {}", expression, error);
+ assertEquals(0, error.getErrorCount());
+ }
+
+ CodeGenerator cg = new CodeGenerator("setup", "eval", new FunctionImplementationRegistry(DrillConfig.create()));
+ cg.addNextWrite(new ValueVectorWriteExpression(-1, materializedExpr));
+ return cg.generate();
+ }
+
+ @After
+ public void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
new file mode 100644
index 0000000..d2655e1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl;
+
+import com.google.caliper.runner.CaliperMain;
+
+
+public class PerformanceTests {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PerformanceTests.class);
+
+
+ public static void main(String[] args){
+ CaliperMain.main(TestExecutionAbstractions.class, args);
+ System.out.println("Hello");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java
new file mode 100644
index 0000000..31f09af
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java
@@ -0,0 +1,226 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.physical.impl;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import sun.misc.Unsafe;
+import com.google.caliper.Benchmark;
+import com.google.caliper.Param;
+
+@SuppressWarnings("restriction")
+public class TestExecutionAbstractions extends Benchmark {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExecutionAbstractions.class);
+
+ /**
+ * General goal is compare the performance of abstract versus concrete
+ * implementations of selection vector dereferencing.
+ */
+
+ private static enum Implementation {
+ CONCRETE, ABSTRACT
+ };
+
+ private static enum SelectionVectorMode {
+ NONE, SV2, SV4
+ };
+
+ @Param
+ private Implementation impl;
+ @Param
+ private SelectionVectorMode mode;
+
+ private int scale = 1024*1024*8;
+
+ private final Unsafe unsafe = retrieveUnsafe();
+ private final ByteBuffer a;
+ private final ByteBuffer b;
+ private final ByteBuffer b2;
+ private final ByteBuffer c;
+ private final ByteBuffer sv2;
+ private final ByteBuffer sv4;
+ private final int max;
+
+
+ public TestExecutionAbstractions() throws Exception {
+ sv2 = ByteBuffer.allocateDirect(scale * 2);
+ sv4 = ByteBuffer.allocateDirect(scale * 4);
+ a = ByteBuffer.allocateDirect(scale * 8);
+ b = ByteBuffer.allocateDirect(scale * 8);
+ b2 = ByteBuffer.allocateDirect(scale * 8);
+ c = ByteBuffer.allocateDirect(scale * 8);
+ int svPos = 0;
+ int i = 0;
+ try {
+
+ Random r = new Random();
+ for (; i < scale; i++) {
+ a.putLong(i * 8, r.nextLong());
+ b.putLong(i * 8, r.nextLong());
+
+ if (r.nextBoolean()) {
+ sv2.putChar(svPos * 2, (char) i);
+ sv4.putInt(svPos * 4, i);
+ svPos++;
+ }
+ }
+ System.out.println("Created test data.");
+ max = mode == SelectionVectorMode.NONE ? 1024 : svPos;
+
+ } catch (Exception ex) {
+ System.out.println("i: " + i + ", svPos" + svPos);
+ throw ex;
+ }
+ }
+
+ private Unsafe retrieveUnsafe(){
+ sun.misc.Unsafe localUnsafe = null;
+
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ localUnsafe = (sun.misc.Unsafe) field.get(null);
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+
+ return localUnsafe;
+ }
+
+ public void timeAdd(int reps) {
+ for (int r = 0; r < reps; r++) {
+ switch (impl) {
+
+ case CONCRETE:
+ switch (mode) {
+
+ case NONE:
+ for (int i = 0; i < max; i++) {
+
+ c.putLong(i * 8, a.getLong(i * 8) + b.getLong(i * 8));
+ }
+
+ break;
+ case SV2:
+ for (int i = 0; i < max; i++) {
+ int index = sv2.getChar(i*2) * 8;
+ c.putLong(i * 8, a.getLong(index) + b.getLong(index));
+ }
+ break;
+ case SV4:
+ for (int i = 0; i < max; i++) {
+ int index = sv4.getInt(i*4) * 8;
+ c.putLong(i * 8, a.getLong(index) + b.getLong(index));
+ }
+ break;
+ }
+ break;
+ case ABSTRACT:
+ LongGetter aGetter = null;
+ LongGetter bGetter = null;
+
+ switch (mode) {
+
+ case NONE:
+ aGetter = new StraightGetter(a);
+ bGetter = new StraightGetter(b);
+ break;
+ case SV2:
+ aGetter = new Sv2Getter(sv2, a);
+ bGetter = new Sv2Getter(sv2, b);
+ break;
+ case SV4:
+ aGetter = new Sv4Getter(sv4, a);
+ bGetter = new Sv4Getter(sv4, b);
+ break;
+
+ }
+
+ for (int i = 0; i < max; i++) {
+ c.putLong(i * 8, aGetter.getLong(i) + bGetter.getLong(i));
+ }
+ break;
+ }
+ }
+
+ }
+
+ private static interface LongGetter {
+ long getLong(int index);
+ }
+
+ private static class StraightGetter implements LongGetter {
+
+ final ByteBuffer b;
+
+ public StraightGetter(ByteBuffer b) {
+ super();
+ this.b = b;
+ }
+
+ @Override
+ public long getLong(int index) {
+ return b.getLong(index * 8);
+ }
+ }
+
+ private static class Sv2Getter implements LongGetter {
+ final ByteBuffer b;
+ final ByteBuffer sv;
+
+ public Sv2Getter(ByteBuffer sv, ByteBuffer b) {
+ super();
+ this.b = b;
+ this.sv = sv;
+ }
+
+ @Override
+ public long getLong(int index) {
+ int pos = sv.getChar(index * 2);
+ return b.getLong(pos * 8);
+ }
+ }
+
+ private static class Sv4Getter implements LongGetter {
+ final ByteBuffer b;
+ final ByteBuffer sv;
+
+ public Sv4Getter(ByteBuffer sv, ByteBuffer b) {
+ super();
+ this.b = b;
+ this.sv = sv;
+ }
+
+ @Override
+ public long getLong(int index) {
+ int pos = sv.getInt(index * 4);
+ return b.getLong(pos * 8);
+ }
+ }
+
+ private long allocate(long bytes){
+ return unsafe.allocateMemory(bytes);
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index cac6aa2..3dc961b 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
@@ -58,33 +57,37 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
boolean schemaChanged = batchLoader.load(batch.getHeader().getDef(), batch.getData());
boolean firstColumn = true;
- // print headers.
- if (schemaChanged) {
- System.out.println("\n\n========NEW SCHEMA=========\n\n");
- for (IntObjectCursor<ValueVector> v : batchLoader) {
+ // print headers.
+ if (schemaChanged) {
+ System.out.println("\n\n========NEW SCHEMA=========\n\n");
+ for (ValueVector<?> value : batchLoader) {
- if (firstColumn) {
- firstColumn = false;
- } else {
- System.out.print("\t");
+ if (firstColumn) {
+ firstColumn = false;
+ } else {
+ System.out.print("\t");
+ }
+ System.out.print(value.getField().getName());
+ System.out.print("[");
+ System.out.print(value.getField().getType().getMinorType());
+ System.out.print("]");
+ }
+ System.out.println();
}
- System.out.print(v.value.getField().getName());
- System.out.print("[");
- System.out.print(v.value.getField().getType().getMinorType());
- System.out.print("]");
- }
- System.out.println();
- }
- for (int i = 0; i < batchLoader.getRecordCount(); i++) {
- boolean first = true;
- recordCount++;
- for (IntObjectCursor<ValueVector> v : batchLoader) {
- if (first) {
- first = false;
- } else {
- System.out.print("\t");
+ for (int i = 0; i < batchLoader.getRecordCount(); i++) {
+ boolean first = true;
+ recordCount++;
+ for (ValueVector<?> value : batchLoader) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.print("\t");
+ }
+ System.out.print(value.getObject(i));
+ }
+ if(!first) System.out.println();
}
System.out.print(v.value.getAccessor().getObject(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ce0da88d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index ab68ea2..99bf820 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -1,110 +1,203 @@
package org.apache.drill.exec.record;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.collect.Range;
-import org.apache.drill.common.expression.*;
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.junit.Test;
-
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.List;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.expression.ArgumentValidator;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.IfExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
+import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.junit.Test;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Range;
+
public class ExpressionTreeMaterializerTest {
- @Test
- public void testMaterializingConstantTree() throws SchemaChangeException {
- ExpressionTreeMaterializer tm = new ExpressionTreeMaterializer();
- ErrorCollector ec = new ErrorCollectorImpl();
- BatchSchema schema = new BatchSchema.BatchSchemaBuilder().buildAndClear();
- LogicalExpression expr = tm.Materialize(new ValueExpressions.LongExpression(1L), schema, ec);
- assertTrue(expr instanceof ValueExpressions.LongExpression);
- assertEquals(1L, ValueExpressions.LongExpression.class.cast(expr).getLong());
- assertFalse(ec.hasErrors());
- }
-
- @Test
- public void testMaterializingLateboundField() throws SchemaChangeException {
- ExpressionTreeMaterializer tm = new ExpressionTreeMaterializer();
- ErrorCollector ec = new ErrorCollectorImpl();
- BatchSchema.BatchSchemaBuilder builder = new BatchSchema.BatchSchemaBuilder();
- builder.addTypedField((short) 2, DataType.INT64, false, RecordField.ValueMode.RLE, Long.class);
- LogicalExpression expr = tm.Materialize(new FieldReference("test"), builder.buildAndClear(), ec);
- assertEquals(DataType.INT64, expr.getDataType());
- assertFalse(ec.hasErrors());
- }
-
- @Test
- public void testMaterializingLateboundTree() throws SchemaChangeException {
- ExpressionTreeMaterializer tm = new ExpressionTreeMaterializer();
- ErrorCollector ec = new ErrorCollectorImpl();
- BatchSchema.BatchSchemaBuilder builder = new BatchSchema.BatchSchemaBuilder();
- builder.addTypedField((short) 2, DataType.INT64, false, RecordField.ValueMode.RLE, Long.class);
- LogicalExpression expr = new IfExpression.Builder().addCondition(
- new IfExpression.IfCondition(new FieldReference("test"),
- new IfExpression.Builder().addCondition(new IfExpression.IfCondition(new ValueExpressions.LongExpression(1L), new FieldReference("test1"))).build()
- )
- ).build();
- LogicalExpression newExpr = tm.Materialize(expr, builder.buildAndClear(), ec);
- assertTrue(newExpr instanceof IfExpression);
- IfExpression newIfExpr = (IfExpression) newExpr;
- assertEquals(1, newIfExpr.conditions.size());
- IfExpression.IfCondition ifCondition = newIfExpr.conditions.get(0);
- assertEquals(DataType.INT64, ifCondition.condition.getDataType());
- assertTrue(ifCondition.expression instanceof IfExpression);
- newIfExpr = (IfExpression) ifCondition.expression;
- assertEquals(1, newIfExpr.conditions.size());
- ifCondition = newIfExpr.conditions.get(0);
- assertEquals(DataType.INT64, ifCondition.expression.getDataType());
- assertEquals(1L, ((ValueExpressions.LongExpression) ifCondition.condition).getLong());
- assertFalse(ec.hasErrors());
- }
-
- @Test
- public void testMaterializingLateboundTreeValidated() throws SchemaChangeException {
- ExpressionTreeMaterializer tm = new ExpressionTreeMaterializer();
- ErrorCollector ec = new ErrorCollector() {
- boolean errorFound = false;
- @Override
- public void addGeneralError(String expr, String s) {errorFound = true;}
- @Override
- public void addUnexpectedArgumentType(String expr, String name, DataType actual, DataType[] expected, int argumentIndex) {}
- @Override
- public void addUnexpectedArgumentCount(String expr, int actual, Range<Integer> expected) {}
- @Override
- public void addUnexpectedArgumentCount(String expr, int actual, int expected) {}
- @Override
- public void addNonNumericType(String expr, DataType actual) {}
- @Override
- public void addUnexpectedType(String expr, int index, DataType actual) {}
- @Override
- public void addExpectedConstantValue(String expr, int actual, String s) {}
- @Override
- public boolean hasErrors() { return errorFound; }
- @Override
- public String toErrorString() { return ""; }
- };
- BatchSchema.BatchSchemaBuilder builder = new BatchSchema.BatchSchemaBuilder();
- builder.addTypedField((short) 2, DataType.INT64, false, RecordField.ValueMode.RLE, Long.class);
- LogicalExpression expr = new FunctionCall(FunctionDefinition.simple("testFunc", new ArgumentValidator() {
- @Override
- public void validateArguments(String expr, List<LogicalExpression> expressions, ErrorCollector errors) {
- errors.addGeneralError(expr, "Error!");
- }
-
- @Override
- public String[] getArgumentNamesByPosition() {
- return new String[0];
- }
- }, OutputTypeDeterminer.FIXED_BOOLEAN), Lists.newArrayList((LogicalExpression) new FieldReference("test")));
- LogicalExpression newExpr = tm.Materialize(expr, builder.buildAndClear(), ec);
- assertTrue(newExpr instanceof FunctionCall);
- FunctionCall funcExpr = (FunctionCall) newExpr;
- assertEquals(1, funcExpr.args.size());
- assertEquals(DataType.INT64, funcExpr.args.get(0).getDataType());
- assertTrue(ec.hasErrors());
- }
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializerTest.class);
+
+ final MajorType boolConstant = MajorType.newBuilder().setMode(DataMode.REQUIRED).setMinorType(MinorType.BOOLEAN).build();
+ final MajorType bigIntType = MajorType.newBuilder().setMode(DataMode.REQUIRED).setMinorType(MinorType.BIGINT).build();
+ final MajorType intType = MajorType.newBuilder().setMode(DataMode.REQUIRED).setMinorType(MinorType.INT).build();
+
+ private MaterializedField getField(int fieldId, String name, MajorType type) {
+ return new MaterializedField(FieldDef.newBuilder().setMajorType(type)
+ .addName(NamePart.newBuilder().setName(name)).build());
+ }
+
+
+ @Test
+ public void testMaterializingConstantTree(@Injectable RecordBatch batch) throws SchemaChangeException {
+
+ ErrorCollector ec = new ErrorCollectorImpl();
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(new ValueExpressions.LongExpression(1L, ExpressionPosition.UNKNOWN), batch, ec);
+ assertTrue(expr instanceof ValueExpressions.LongExpression);
+ assertEquals(1L, ValueExpressions.LongExpression.class.cast(expr).getLong());
+ assertFalse(ec.hasErrors());
+ }
+
+ @Test
+ public void testMaterializingLateboundField(final @Injectable RecordBatch batch) throws SchemaChangeException {
+ final SchemaBuilder builder = BatchSchema.newBuilder();
+ builder.addField(getField(2, "test", bigIntType));
+ final BatchSchema schema = builder.build();
+
+ new NonStrictExpectations() {
+ {
+ batch.getValueVector(new FieldReference("test", ExpressionPosition.UNKNOWN)); result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
+ }
+ };
+
+ ErrorCollector ec = new ErrorCollectorImpl();
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(new FieldReference("test",
+ ExpressionPosition.UNKNOWN), batch, ec);
+ assertEquals(bigIntType, expr.getMajorType());
+ assertFalse(ec.hasErrors());
+ }
+
+ @Test
+ public void testMaterializingLateboundTree(final @Injectable RecordBatch batch) throws SchemaChangeException {
+ new NonStrictExpectations() {
+ {
+ batch.getValueVector(new FieldReference("test", ExpressionPosition.UNKNOWN)); result = new TypedFieldId(Types.required(MinorType.BOOLEAN), -4);
+ batch.getValueVector(new FieldReference("test1", ExpressionPosition.UNKNOWN)); result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
+ }
+ };
+
+ ErrorCollector ec = new ErrorCollectorImpl();
+
+
+ LogicalExpression expr = new IfExpression.Builder()
+ .addCondition(
+ new IfExpression.IfCondition( //
+ new FieldReference("test", ExpressionPosition.UNKNOWN), //
+ new IfExpression.Builder() //
+ .addCondition( //
+ new IfExpression.IfCondition( //
+ new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), new FieldReference(
+ "test1", ExpressionPosition.UNKNOWN)))
+ .setElse(new ValueExpressions.LongExpression(1L, ExpressionPosition.UNKNOWN)).build()) //
+ ) //
+ .setElse(new ValueExpressions.LongExpression(1L, ExpressionPosition.UNKNOWN)).build();
+ LogicalExpression newExpr = ExpressionTreeMaterializer.materialize(expr, batch, ec);
+ assertTrue(newExpr instanceof IfExpression);
+ IfExpression newIfExpr = (IfExpression) newExpr;
+ assertEquals(1, newIfExpr.conditions.size());
+ IfExpression.IfCondition ifCondition = newIfExpr.conditions.get(0);
+ assertTrue(ifCondition.expression instanceof IfExpression);
+ newIfExpr = (IfExpression) ifCondition.expression;
+ assertEquals(1, newIfExpr.conditions.size());
+ ifCondition = newIfExpr.conditions.get(0);
+ assertEquals(bigIntType, ifCondition.expression.getMajorType());
+ assertEquals(true, ((ValueExpressions.BooleanExpression) ifCondition.condition).value);
+ if (ec.hasErrors()) System.out.println(ec.toErrorString());
+ assertFalse(ec.hasErrors());
+ }
+
+ @Test
+ public void testMaterializingLateboundTreeValidated(final @Injectable RecordBatch batch) throws SchemaChangeException {
+ ErrorCollector ec = new ErrorCollector() {
+ int errorCount = 0;
+
+ @Override
+ public void addGeneralError(ExpressionPosition expr, String s) {
+ errorCount++;
+ }
+
+ @Override
+ public void addUnexpectedArgumentType(ExpressionPosition expr, String name, MajorType actual, MajorType[] expected,
+ int argumentIndex) {
+ errorCount++;
+ }
+
+ @Override
+ public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, Range<Integer> expected) {
+ errorCount++;
+ }
+
+ @Override
+ public void addUnexpectedArgumentCount(ExpressionPosition expr, int actual, int expected) {
+ errorCount++;
+ }
+
+ @Override
+ public void addNonNumericType(ExpressionPosition expr, MajorType actual) {
+ errorCount++;
+ }
+
+ @Override
+ public void addUnexpectedType(ExpressionPosition expr, int index, MajorType actual) {
+ errorCount++;
+ }
+
+ @Override
+ public void addExpectedConstantValue(ExpressionPosition expr, int actual, String s) {
+ errorCount++;
+ }
+
+ @Override
+ public boolean hasErrors() {
+ return errorCount > 0;
+ }
+
+ @Override
+ public String toErrorString() {
+ return String.format("Found %s errors.", errorCount);
+ }
+
+ @Override
+ public int getErrorCount() {
+ return errorCount;
+ }
+ };
+
+ new NonStrictExpectations() {
+ {
+ batch.getValueVector(new FieldReference("test", ExpressionPosition.UNKNOWN)); result = new TypedFieldId(Types.required(MinorType.BIGINT), -5);
+ }
+ };
+
+ LogicalExpression functionCallExpr = new FunctionCall(FunctionDefinition.simple("testFunc",
+ new ArgumentValidator() {
+ @Override
+ public void validateArguments(ExpressionPosition expr, List<LogicalExpression> expressions, ErrorCollector errors) {
+ errors.addGeneralError(expr, "Error!");
+ }
+
+ @Override
+ public String[] getArgumentNamesByPosition() {
+ return new String[0];
+ }
+ }, OutputTypeDeterminer.FIXED_BOOLEAN), Lists.newArrayList((LogicalExpression) new FieldReference("test",
+ ExpressionPosition.UNKNOWN)), ExpressionPosition.UNKNOWN);
+ LogicalExpression newExpr = ExpressionTreeMaterializer.materialize(functionCallExpr, batch, ec);
+ assertTrue(newExpr instanceof FunctionCall);
+ FunctionCall funcExpr = (FunctionCall) newExpr;
+ assertEquals(1, funcExpr.args.size());
+ assertEquals(bigIntType, funcExpr.args.get(0).getMajorType());
+ assertEquals(1, ec.getErrorCount());
+ System.out.println(ec.toErrorString());
+ }
}