You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 03:18:53 UTC
[45/51] [abbrv] Move to Optiq 0.6 Also includes: -improve exception
catching -move schema path parsing to Antlr -close zookeeper connection on if
client created -enhance BaseTestQuery and have other query tests utilize it
-Various test fixes for better m
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 080679b..30cde91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -50,14 +50,14 @@ import com.google.common.io.Files;
* methods and fields of the class to merge to the class that is being visited.
*/
class MergeAdapter extends ClassVisitor {
-
+
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeAdapter.class);
-
+
private ClassNode classToMerge;
private ClassSet set;
-
+
private Set<String> mergingNames = Sets.newHashSet();
-
+
private MergeAdapter(ClassSet set, ClassVisitor cv, ClassNode cn) {
super(Opcodes.ASM4, cv);
this.classToMerge = cn;
@@ -88,16 +88,16 @@ class MergeAdapter extends ClassVisitor {
System.out.println("Annotation");
return super.visitAnnotation(desc, visible);
}
-
+
// visit the class
public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
// use the access and names of the impl class.
if(name.contains("$")){
super.visit(version, access, name, signature, superName, interfaces);
}else{
- super.visit(version, access ^ Modifier.ABSTRACT | Modifier.FINAL, name, signature, superName, interfaces);
+ super.visit(version, access ^ Modifier.ABSTRACT | Modifier.FINAL, name, signature, superName, interfaces);
}
-
+
// this.cname = name;
}
@@ -107,8 +107,8 @@ class MergeAdapter extends ClassVisitor {
// skip all abstract methods as they should have implementations.
if ((access & Modifier.ABSTRACT) != 0 || mergingNames.contains(arg1)) {
-
- logger.debug("Skipping copy of '{}()' since it is abstract or listed elsewhere.", arg1);
+
+// logger.debug("Skipping copy of '{}()' since it is abstract or listed elsewhere.", arg1);
return null;
}
if(arg3 != null){
@@ -156,8 +156,8 @@ class MergeAdapter extends ClassVisitor {
public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
return super.visitField(access, name, desc, signature, value);
}
-
-
+
+
public static class MergedClassResult{
public byte[] bytes;
public Collection<String> innerClasses;
@@ -166,32 +166,32 @@ class MergeAdapter extends ClassVisitor {
this.bytes = bytes;
this.innerClasses = innerClasses;
}
-
-
+
+
}
-
+
public static MergedClassResult getMergedClass(ClassSet set, byte[] precompiledClass, ClassNode generatedClass) throws IOException{
// Setup adapters for merging, remapping class names and class writing. This is done in reverse order of how they
// will be evaluated.
-
+
ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
RemapClasses re = new RemapClasses(set);
ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, re);
ClassVisitor visitor = remappingAdapter;
if(generatedClass != null){
- visitor = new MergeAdapter(set, remappingAdapter, generatedClass);
+ visitor = new MergeAdapter(set, remappingAdapter, generatedClass);
}
ClassReader tReader = new ClassReader(precompiledClass);
tReader.accept(visitor, ClassReader.EXPAND_FRAMES);
byte[] outputClass = cw.toByteArray();
-
+
// enable when you want all the generated merged class files to also be written to disk.
//Files.write(outputClass, new File(String.format("/tmp/drill-generated-classes/%s-output.class", set.generated.dot)));
return new MergedClassResult(outputClass, re.getInnerClasses());
}
-
+
static class RemapClasses extends Remapper {
@@ -208,15 +208,15 @@ class MergeAdapter extends ClassVisitor {
@Override
public String map(String typeName) {
-
+
// remap the names of all classes that start with the old class name.
if (typeName.startsWith(top.precompiled.slash)) {
-
+
// write down all the sub classes.
if (typeName.startsWith(current.precompiled.slash + "$")){
innerClasses.add(typeName);
}
-
+
return typeName.replace(top.precompiled.slash, top.generated.slash);
}
return typeName;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 4221664..2728759 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -39,7 +39,7 @@ public class Accountor {
private final long total;
private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
private final FragmentHandle handle;
-
+
public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) {
// TODO: fix preallocation stuff
AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
@@ -106,7 +106,7 @@ public class Accountor {
}
sb.append(".\n");
-
+
Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
for (DebugStackTrace t : buffers.values()) {
multi.put(t, t);
@@ -114,7 +114,7 @@ public class Accountor {
for (DebugStackTrace entry : multi.keySet()) {
Collection<DebugStackTrace> allocs = multi.get(entry);
-
+
sb.append("\n\n\tTotal ");
sb.append(allocs.size());
sb.append(" allocation(s) of byte size(s): ");
@@ -127,20 +127,21 @@ public class Accountor {
}
sb.append(", ");
}
-
+
sb.append("at stack location:\n");
entry.addToString(sb);
}
-
+
throw new IllegalStateException(sb.toString());
-
+
}
-
+ remainder.close();
+
}
private class DebugStackTrace {
-
+
private StackTraceElement[] elements;
private long size;
private String desc;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 927e5a2..95e57d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.memory;
import java.util.concurrent.atomic.AtomicLong;
/**
- *
- *
+ *
+ *
* TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
*/
public class AtomicRemainder {
@@ -32,6 +32,7 @@ public class AtomicRemainder {
private final AtomicLong unaccountable;
private final long max;
private final long pre;
+ private boolean closed = false;
public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
this.parent = parent;
@@ -52,7 +53,7 @@ public class AtomicRemainder {
/**
* Automatically allocate memory. This is used when an actual allocation happened to be larger than requested. This
* memory has already been used up so it must be accurately accounted for in future allocations.
- *
+ *
* @param size
*/
public void forceGet(long size) {
@@ -101,7 +102,7 @@ public class AtomicRemainder {
/**
* Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
- *
+ *
* @param size
*/
public void returnAllocation(long size) {
@@ -115,4 +116,10 @@ public class AtomicRemainder {
}
}
+ public void close(){
+ if(!closed){
+ closed = true;
+// if(parent != null) parent.returnAllocation(pre);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e8ab192..ddc5025 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
@@ -96,9 +97,11 @@ public class FragmentContext implements Closeable {
public DrillbitContext getDrillbitContext() {
return context;
}
-
+
public SchemaPlus getRootSchema(){
- return context.getStorage().getSchemaFactory().getOrphanedRootSchema();
+ SchemaPlus root = Frameworks.createRootSchema();
+ context.getStorage().getSchemaFactory().registerSchemas(null, root);
+ return root;
}
/**
@@ -116,7 +119,7 @@ public class FragmentContext implements Closeable {
public long getQueryStartTime() {
return this.queryStartTime;
}
-
+
/**
* The FragmentHandle for this Fragment
* @return FragmentHandle
@@ -136,10 +139,10 @@ public class FragmentContext implements Closeable {
public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
return getImplementationClass(cg.getCodeGenerator());
}
-
+
public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException {
long t1 = System.nanoTime();
-
+
T t = transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(),
cg.getMaterializedClassName());
logger.debug("Compile time: {} millis.", (System.nanoTime() - t1) / 1000 / 1000);
@@ -177,7 +180,7 @@ public class FragmentContext implements Closeable {
public void addDaemonThread(Thread thread) {
daemonThreads.add(thread);
thread.start();
-
+
}
public IncomingBuffers getBuffers() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
new file mode 100644
index 0000000..7e6ae8e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.slf4j.Logger;
+
+public class Multitimer<T extends Enum<T>> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Multitimer.class);
+
+ private final long start;
+ private final long[] times;
+ private final Class<T> clazz;
+
+ public Multitimer(Class<T> clazz){
+ this.times = new long[clazz.getEnumConstants().length];
+ this.start = System.nanoTime();
+ this.clazz = clazz;
+ }
+
+ public void mark(T timer){
+ times[timer.ordinal()] = System.nanoTime();
+ }
+
+ public void log(Logger logger){
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 1564b09..17d47aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,6 +19,9 @@ package org.apache.drill.exec.ops;
import java.util.Collection;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -27,28 +30,48 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
public class QueryContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
-
- private QueryId queryId;
- private DrillbitContext drillbitContext;
- private WorkEventBus workBus;
-
- public QueryContext(QueryId queryId, DrillbitContext drllbitContext) {
+
+ private final QueryId queryId;
+ private final DrillbitContext drillbitContext;
+ private final WorkEventBus workBus;
+ private UserSession session;
+ public final Multitimer<QuerySetup> timer;
+
+ public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext) {
super();
this.queryId = queryId;
this.drillbitContext = drllbitContext;
this.workBus = drllbitContext.getWorkBus();
+ this.session = session;
+ this.timer = new Multitimer<>(QuerySetup.class);
+ }
+
+ public UserSession getSession(){
+ return session;
}
-
+
+ public SchemaPlus getNewDefaultSchema(){
+ SchemaPlus rootSchema = Frameworks.createRootSchema();
+ drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema);
+ SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+ if(defaultSchema == null){
+ return rootSchema;
+ }else{
+ return defaultSchema;
+ }
+ }
+
+
public DrillbitEndpoint getCurrentEndpoint(){
return drillbitContext.getEndpoint();
}
-
+
public QueryId getQueryId() {
return queryId;
}
@@ -56,36 +79,32 @@ public class QueryContext {
public StoragePluginRegistry getStorage(){
return drillbitContext.getStorage();
}
-
-
+
+
public DistributedCache getCache(){
return drillbitContext.getCache();
}
-
+
public Collection<DrillbitEndpoint> getActiveEndpoints(){
return drillbitContext.getBits();
}
-
+
public PhysicalPlanReader getPlanReader(){
return drillbitContext.getPlanReader();
}
-
+
public DataConnectionCreator getDataConnectionsPool(){
return drillbitContext.getDataConnectionsPool();
}
-
+
public DrillConfig getConfig(){
return drillbitContext.getConfig();
}
-
+
public WorkEventBus getWorkBus(){
return workBus;
}
- public DrillSchemaFactory getFactory(){
- return drillbitContext.getSchemaFactory();
- }
-
public FunctionImplementationRegistry getFunctionRegistry(){
return drillbitContext.getFunctionImplementationRegistry();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
new file mode 100644
index 0000000..ef73867
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+public enum QuerySetup {
+ START_SQL_PARSING,
+ START_SQL_VALIDATION,
+ START_SQL_TO_REL,
+ START_OPTIQ_REL_TO_DRILL_LOGICAL,
+ START_DRILL_LOGICAL_TO_PHYSICAL;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 6c111bb..38d56ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -18,11 +18,15 @@
package org.apache.drill.exec.physical.impl;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.ValueVector;
public interface OutputMutator {
public void removeField(MaterializedField field) throws SchemaChangeException;
+ public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException ;
+
+ @Deprecated
public void addField(ValueVector vector) throws SchemaChangeException ;
public void removeAllFields();
public void setNewSchema() throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 24ea9c4..ace2677 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -168,6 +169,15 @@ public class ScanBatch implements RecordBatch {
ScanBatch.this.schemaChanged = true;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+ ValueVector v = TypeHelper.getNewVector(field, context.getAllocator());
+ if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ addField(v);
+ return (T) v;
+ }
+
}
@Override
@@ -179,7 +189,7 @@ public class ScanBatch implements RecordBatch {
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
-
+
public void cleanup(){
container.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 569bfc7..d2d8d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
public abstract class PriorityQueueTemplate implements PriorityQueue {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
-
+
private SelectionVector4 heapSv4;//This holds the heap
private SelectionVector4 finalSv4;//This is for final sorted output
private ExpandableHyperContainer hyperBatch;
@@ -58,7 +58,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
BatchSchema schema = container.getSchema();
VectorContainer newContainer = new VectorContainer();
for (MaterializedField field : schema) {
- int id = container.getValueVectorId(new SchemaPath(field.getName(), ExpressionPosition.UNKNOWN)).getFieldId();
+ int id = container.getValueVectorId(field.getAsSchemaPath()).getFieldId();
newContainer.add(container.getValueAccessorById(id, field.getValueClass()).getValueVectors());
}
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
@@ -72,7 +72,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
}
v4.clear();
}
-
+
@Override
public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException{
Stopwatch watch = new Stopwatch();
@@ -184,13 +184,13 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
siftDown();
return value;
}
-
+
public void swap(int sv0, int sv1) {
int tmp = heapSv4.get(sv0);
heapSv4.set(sv0, heapSv4.get(sv1));
heapSv4.set(sv1, tmp);
}
-
+
public int compare(int leftIndex, int rightIndex) {
int sv1 = heapSv4.get(leftIndex);
int sv2 = heapSv4.get(rightIndex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 5a7a6fa..775766d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
-public abstract class HashTableTemplate implements HashTable {
+public abstract class HashTableTemplate implements HashTable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
private static final boolean EXTRA_DEBUG = false;
@@ -72,7 +72,7 @@ public abstract class HashTableTemplate implements HashTable {
private int freeIndex = 0;
// Placeholder for the current index while probing the hash table
- private IntHolder currentIdxHolder;
+ private IntHolder currentIdxHolder;
private FragmentContext context;
@@ -86,7 +86,7 @@ public abstract class HashTableTemplate implements HashTable {
private RecordBatch outgoing;
// Hash table configuration parameters
- private HashTableConfig htConfig;
+ private HashTableConfig htConfig;
// The original container from which others may be cloned
private VectorContainer htContainerOrig;
@@ -96,15 +96,15 @@ public abstract class HashTableTemplate implements HashTable {
private int outputCount = 0;
// This class encapsulates the links, keys and values for up to BATCH_SIZE
- // *unique* records. Thus, suppose there are N incoming record batches, each
- // of size BATCH_SIZE..but they have M unique keys altogether, the number of
+ // *unique* records. Thus, suppose there are N incoming record batches, each
+ // of size BATCH_SIZE..but they have M unique keys altogether, the number of
// BatchHolders will be (M/BATCH_SIZE) + 1
public class BatchHolder {
// Container of vectors to hold type-specific keys
private VectorContainer htContainer;
- // Array of 'link' values
+ // Array of 'link' values
private IntVector links;
// Array of hash values - this is useful when resizing the hash table
@@ -122,7 +122,7 @@ public abstract class HashTableTemplate implements HashTable {
ValueVector vv = TypeHelper.getNewVector(w.getField(), context.getAllocator());
VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
htContainer.add(vv);
- }
+ }
}
links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -145,10 +145,10 @@ public abstract class HashTableTemplate implements HashTable {
}
// Check if the key at the currentIdx position in hash table matches the key
- // at the incomingRowIdx. if the key does not match, update the
+ // at the incomingRowIdx. if the key does not match, update the
// currentIdxHolder with the index of the next link.
- private boolean isKeyMatch(int incomingRowIdx,
- IntHolder currentIdxHolder,
+ private boolean isKeyMatch(int incomingRowIdx,
+ IntHolder currentIdxHolder,
boolean isProbe) {
int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
@@ -156,7 +156,7 @@ public abstract class HashTableTemplate implements HashTable {
if (isProbe)
match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
- else
+ else
match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch);
if (! match) {
@@ -165,9 +165,9 @@ public abstract class HashTableTemplate implements HashTable {
return match;
}
- // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
- // container at the specified index
- private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
+ // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
+ // container at the specified index
+ private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
int currentIdxWithinBatch = currentIdx & BATCH_MASK;
if (! setValue(incomingRowIdx, currentIdxWithinBatch)) {
@@ -179,7 +179,7 @@ public abstract class HashTableTemplate implements HashTable {
lastEntryBatch.updateLinks(lastEntryIdxWithinBatch, currentIdx);
}
- // since this is the last entry in the hash chain, the links array at position currentIdx
+ // since this is the last entry in the hash chain, the links array at position currentIdx
// will point to a null (empty) slot
links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
@@ -204,7 +204,7 @@ public abstract class HashTableTemplate implements HashTable {
IntVector newHashValues = allocMetadataVector(size, 0);
for (int i = 0; i <= maxOccupiedIdx; i++) {
- int entryIdxWithinBatch = i;
+ int entryIdxWithinBatch = i;
int entryIdx = entryIdxWithinBatch + batchStartIdx;
int hash = hashValues.getAccessor().get(entryIdxWithinBatch); // get the already saved hash value
int bucketIdx = getBucketIndex(hash, numbuckets);
@@ -237,35 +237,35 @@ public abstract class HashTableTemplate implements HashTable {
}
- }
+ }
links.clear();
hashValues.clear();
-
+
links = newLinks;
hashValues = newHashValues;
}
-
+
private boolean outputKeys() {
- /** for debugging
+ /** for debugging
Object tmp = (htContainer).getValueAccessorById(0, BigIntVector.class).getValueVector();
BigIntVector vv0 = null;
BigIntHolder holder = null;
- if (tmp != null) {
+ if (tmp != null) {
vv0 = ((BigIntVector) tmp);
holder = new BigIntHolder();
}
*/
- for (int i = 0; i <= maxOccupiedIdx; i++) {
+ for (int i = 0; i <= maxOccupiedIdx; i++) {
if (outputRecordKeys(i, outputCount) ) {
if (EXTRA_DEBUG) logger.debug("Outputting keys to {}", outputCount) ;
- // debugging
+ // debugging
// holder.value = vv0.getAccessor().get(i);
- // if (holder.value == 100018 || holder.value == 100021) {
+ // if (holder.value == 100018 || holder.value == 100021) {
// logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i, outputCount);
// }
@@ -288,41 +288,41 @@ public abstract class HashTableTemplate implements HashTable {
}
}
}
-
+
private void clear() {
htContainer.clear();;
links.clear();
hashValues.clear();
}
- // These methods will be code-generated
+ // These methods will be code-generated
@RuntimeOverridden
- protected void setupInterior(@Named("incomingBuild") RecordBatch incomingBuild,
+ protected void setupInterior(@Named("incomingBuild") RecordBatch incomingBuild,
@Named("incomingProbe") RecordBatch incomingProbe,
@Named("outgoing") RecordBatch outgoing,
@Named("htContainer") VectorContainer htContainer) {}
@RuntimeOverridden
- protected boolean isKeyMatchInternalBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
+ protected boolean isKeyMatchInternalBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
@RuntimeOverridden
- protected boolean isKeyMatchInternalProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
+ protected boolean isKeyMatchInternalProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
@RuntimeOverridden
- protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
+ protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
@RuntimeOverridden
- protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {return false;}
+ protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {return false;}
} // class BatchHolder
@Override
- public void setup(HashTableConfig htConfig, FragmentContext context,
+ public void setup(HashTableConfig htConfig, FragmentContext context,
RecordBatch incomingBuild, RecordBatch incomingProbe,
RecordBatch outgoing, VectorContainer htContainerOrig) {
- float loadf = htConfig.getLoadFactor();
+ float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
@@ -345,18 +345,17 @@ public abstract class HashTableTemplate implements HashTable {
threshold = (int) Math.ceil(tableSize * loadf);
- dummyIntField = MaterializedField.create(new SchemaPath("dummy", ExpressionPosition.UNKNOWN),
- Types.required(MinorType.INT));
+ dummyIntField = MaterializedField.create(SchemaPath.getSimplePath("dummy"), Types.required(MinorType.INT));
startIndices = allocMetadataVector(tableSize, EMPTY_SLOT);
- // Create the first batch holder
+ // Create the first batch holder
batchHolders = new ArrayList<BatchHolder>();
addBatchHolder();
doSetup(incomingBuild, incomingProbe);
- currentIdxHolder = new IntHolder();
+ currentIdxHolder = new IntHolder();
}
public int numBuckets() {
@@ -399,7 +398,7 @@ public abstract class HashTableTemplate implements HashTable {
public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
int hash = getHashBuild(incomingRowIdx);
- int i = getBucketIndex(hash, numBuckets());
+ int i = getBucketIndex(hash, numBuckets());
int startIdx = startIndices.getAccessor().get(i);
int currentIdx;
int currentIdxWithinBatch;
@@ -407,9 +406,9 @@ public abstract class HashTableTemplate implements HashTable {
BatchHolder lastEntryBatch = null;
int lastEntryIdxWithinBatch = EMPTY_SLOT;
-
+
if (startIdx == EMPTY_SLOT) {
- // this is the first entry in this bucket; find the first available slot in the
+ // this is the first entry in this bucket; find the first available slot in the
// container of keys and values
currentIdx = freeIndex++;
addBatchIfNeeded(currentIdx);
@@ -430,8 +429,8 @@ public abstract class HashTableTemplate implements HashTable {
bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
currentIdxHolder.value = currentIdx;
-
- // if startIdx is non-empty, follow the hash chain links until we find a matching
+
+ // if startIdx is non-empty, follow the hash chain links until we find a matching
// key or reach the end of the chain
while (true) {
currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
@@ -439,7 +438,7 @@ public abstract class HashTableTemplate implements HashTable {
if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
htIdxHolder.value = currentIdxHolder.value;
found = true;
- break;
+ break;
}
else if (currentIdxHolder.value == EMPTY_SLOT) {
lastEntryBatch = bh;
@@ -462,7 +461,7 @@ public abstract class HashTableTemplate implements HashTable {
htIdxHolder.value = currentIdx;
return PutStatus.KEY_ADDED;
}
- else
+ else
return PutStatus.PUT_FAILED;
}
@@ -471,7 +470,7 @@ public abstract class HashTableTemplate implements HashTable {
private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
- // resize hash table if needed and transfer the metadata
+ // resize hash table if needed and transfer the metadata
resizeAndRehashIfNeeded(currentIdx);
addBatchIfNeeded(currentIdx);
@@ -495,7 +494,7 @@ public abstract class HashTableTemplate implements HashTable {
if (currentIdx == EMPTY_SLOT)
return -1;
-
+
BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
currentIdxHolder.value = currentIdx;
@@ -503,7 +502,7 @@ public abstract class HashTableTemplate implements HashTable {
while (true) {
if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
- found = true;
+ found = true;
break;
} else if (currentIdxHolder.value == EMPTY_SLOT) {
break;
@@ -511,18 +510,18 @@ public abstract class HashTableTemplate implements HashTable {
bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK);
}
}
-
+
return found ? currentIdxHolder.value : -1;
}
- // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
+ // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
// currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
- // the capacity, we will add a new BatchHolder.
+ // the capacity, we will add a new BatchHolder.
private BatchHolder addBatchIfNeeded(int currentIdx) {
int totalBatchSize = batchHolders.size() * BATCH_SIZE;
-
+
if (currentIdx >= totalBatchSize) {
- BatchHolder bh = addBatchHolder();
+ BatchHolder bh = addBatchHolder();
if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
return bh;
}
@@ -538,19 +537,19 @@ public abstract class HashTableTemplate implements HashTable {
return bh;
}
- // Resize the hash table if needed by creating a new one with double the number of buckets.
+ // Resize the hash table if needed by creating a new one with double the number of buckets.
// For each entry in the old hash table, re-hash it to the new table and update the metadata
- // in the new table.. the metadata consists of the startIndices, links and hashValues.
- // Note that the keys stored in the BatchHolders are not moved around.
+ // in the new table.. the metadata consists of the startIndices, links and hashValues.
+ // Note that the keys stored in the BatchHolders are not moved around.
private void resizeAndRehashIfNeeded(int currentIdx) {
if (numEntries < threshold)
return;
if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
- // If the table size is already MAXIMUM_CAPACITY, don't resize
- // the table, but set the threshold to Integer.MAX_VALUE such that
- // future attempts to resize will return immediately.
+ // If the table size is already MAXIMUM_CAPACITY, don't resize
+ // the table, but set the threshold to Integer.MAX_VALUE such that
+ // future attempts to resize will return immediately.
if (tableSize == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
@@ -570,9 +569,9 @@ public abstract class HashTableTemplate implements HashTable {
for (int i = 0; i < batchHolders.size(); i++) {
BatchHolder bh = batchHolders.get(i) ;
int batchStartIdx = i * BATCH_SIZE;
- bh.rehash(tableSize, newStartIndices, batchStartIdx);
- }
-
+ bh.rehash(tableSize, newStartIndices, batchStartIdx);
+ }
+
startIndices.clear();
startIndices = newStartIndices;
@@ -607,11 +606,11 @@ public abstract class HashTableTemplate implements HashTable {
return vector;
}
- // These methods will be code-generated in the context of the outer class
+ // These methods will be code-generated in the context of the outer class
protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) ;
protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ;
-}
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 5fc3733..36428ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -84,9 +84,9 @@ import com.sun.codemodel.JExpr;
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
- public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
+ public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
ClassGenerator.DEFAULT_SCALAR_MAP);
- public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
+ public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
public final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -131,14 +131,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
this.minorFragmentSampleCount = cache.getCounter(mapKey);
-
- SchemaPath outputPath = new SchemaPath(popConfig.getRef().getPath(), ExpressionPosition.UNKNOWN);
+
+ SchemaPath outputPath = popConfig.getRef();
MaterializedField outputField = MaterializedField.create(outputPath, Types.required(TypeProtos.MinorType.INT));
this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, context.getAllocator());
-
+
}
-
+
@Override
public void cleanup() {
super.cleanup();
@@ -152,10 +152,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
IterOutcome upstream;
// Start collecting batches until recordsToSample records have been collected
-
+
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
builder.add(incoming);
-
+
recordsSampled += incoming.getRecordCount();
outer: while (recordsSampled < recordsToSample) {
@@ -211,8 +211,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
containerToCache.clear();
sampleToSave.clear();
return true;
-
-
+
+
}
/**
@@ -221,7 +221,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
* the samples, sorts all the records, builds a partition table, and attempts to push the partition table to the
* distributed cache. Whichever table gets pushed first becomes the table used by all fragments for partitioning.
- *
+ *
* @return True is successful. False if failed.
*/
private boolean getPartitionVectors() {
@@ -232,7 +232,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
if (!saveSamples()){
return false;
}
-
+
VectorAccessibleSerializable finalTable = null;
long val = minorFragmentSampleCount.incrementAndGet();
@@ -282,7 +282,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private void buildTable() throws SchemaChangeException, ClassTransformationException, IOException {
// Get all samples from distributed map
-
+
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
for (VectorAccessibleSerializable w : mmap.get(mapKey)) {
containerBuilder.add(w.get());
@@ -293,7 +293,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
for (Ordering od : popConfig.getOrderings()) {
- SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
+ SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
@@ -317,7 +317,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
-
+
candidatePartitionTable.clear();
allSamplesContainer.clear();
containerBuilder.clear();
@@ -330,7 +330,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
* Ordering determines the value of each column. These records will later be sorted based on the values in each
* column, in the same order as the orderings.
- *
+ *
* @param sv4
* @param incoming
* @param outgoing
@@ -348,7 +348,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
int i = 0;
for (Ordering od : orderings) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
- SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
+ SchemaPath schemaPath = SchemaPath.getSimplePath("f" + i++);
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
.clearMode().setMode(TypeProtos.DataMode.REQUIRED);
TypeProtos.MajorType newType = builder.build();
@@ -423,9 +423,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
if (!getPartitionVectors()){
cleanup();
- return IterOutcome.STOP;
+ return IterOutcome.STOP;
}
-
+
batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
first = false;
@@ -497,7 +497,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
/**
* Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on
* which partition a record falls into in the partition table
- *
+ *
* @param batch
* @throws SchemaChangeException
*/
@@ -547,7 +547,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
this.projector = context.getImplementationClass(cg);
- projector.setup(context, batch, this, transfers, partitionVectors, partitions, new SchemaPath(popConfig.getRef().getPath(), ExpressionPosition.UNKNOWN));
+ projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index cd7e632..94fd385 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -98,7 +98,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
/**
* Send the record batch to the target node, then reset the value vectors
- *
+ *
* @return true if a flush was needed; otherwise false
* @throws SchemaChangeException
*/
@@ -106,7 +106,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
final ExecProtos.FragmentHandle handle = context.getHandle();
if (recordCount != 0) {
-
+
for(VectorWrapper<?> w : vectorContainer){
w.getValueVector().getMutator().setValueCount(recordCount);
}
@@ -147,7 +147,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
recordCount = 0;
vectorContainer.zeroVectors();
for (VectorWrapper<?> v : vectorContainer) {
- logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
+// logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
}
if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
@@ -173,10 +173,10 @@ public class OutgoingRecordBatch implements VectorAccessible {
ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity);
vectorContainer.add(outgoingVector);
- logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
+// logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
}
outSchema = bldr.build();
- logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
+// logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
}
/**
@@ -226,11 +226,11 @@ public class OutgoingRecordBatch implements VectorAccessible {
return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
}
-
+
private StatusHandler statusHandler = new StatusHandler();
private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
RpcException ex;
-
+
@Override
public void success(Ack value, ByteBuf buffer) {
sendCount.decrement();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4342f52..aaee8e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -24,10 +24,10 @@ import java.util.Set;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -58,11 +58,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private Projector projector;
private List<ValueVector> allocationVectors;
-
+
public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
super(pop, context, incoming);
}
-
+
@Override
public int getRecordCount() {
return incoming.getRecordCount();
@@ -85,32 +85,27 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private FieldReference getRef(NamedExpression e){
FieldReference ref = e.getRef();
PathSegment seg = ref.getRootSegment();
- if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
- return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
- }
+
+// if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
+// return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
+// }
return ref;
}
-
+
private boolean isAnyWildcard(List<NamedExpression> exprs){
for(NamedExpression e : exprs){
if(isWildcard(e)) return true;
}
return false;
}
-
+
private boolean isWildcard(NamedExpression ex){
- LogicalExpression expr = ex.getExpr();
- LogicalExpression ref = ex.getRef();
- if(expr instanceof SchemaPath && ref instanceof SchemaPath){
- PathSegment e = ((SchemaPath) expr).getRootSegment();
- PathSegment n = ((SchemaPath) ref).getRootSegment();
- if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){
- return true;
- }
- }
- return false;
+ if( !(ex.getExpr() instanceof SchemaPath)) return false;
+ NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+ NameSegment ref = ex.getRef().getRootSegment();
+ return ref.getPath().equals("*") && expr.getPath().equals("*");
}
-
+
@Override
protected void setupNewSchema() throws SchemaChangeException{
this.allocationVectors = Lists.newArrayList();
@@ -118,7 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final List<NamedExpression> exprs = popConfig.getExprs();
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
-
+
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
Set<Integer> transferFieldIds = new HashSet();
@@ -128,7 +123,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
if(isAnyWildcard){
for(VectorWrapper<?> wrapper : incoming){
ValueVector vvIn = wrapper.getValueVector();
- TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName()));
+ String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName();
+ FieldReference ref = new FieldReference(name);
+ TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
container.add(tp.getTo());
}
@@ -154,7 +151,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
transfers.add(tp);
container.add(tp.getTo());
transferFieldIds.add(vectorRead.getFieldId().getFieldId());
- logger.debug("Added transfer.");
+// logger.debug("Added transfer.");
}else{
// need to do evaluation.
ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
@@ -162,15 +159,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
cg.addExpr(write);
- logger.debug("Added eval.");
+// logger.debug("Added eval.");
}
}
-
+
}
-
+
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
-
+
try {
this.projector = context.getImplementationClass(cg.getCodeGenerator());
projector.setup(context, incoming, this, transfers);
@@ -178,6 +175,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 5cdab96..29e629a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -59,7 +59,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
private final SortRecordBatchBuilder builder;
private Sorter sorter;
private BatchSchema schema;
-
+
public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context);
this.incoming = incoming;
@@ -87,8 +87,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return builder.getSv4();
}
-
-
+
+
@Override
public void cleanup() {
super.cleanup();
@@ -105,8 +105,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return IterOutcome.NONE;
}
}
-
-
+
+
try{
outer: while (true) {
IterOutcome upstream = incoming.next();
@@ -134,19 +134,19 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
throw new UnsupportedOperationException();
}
}
-
- if (schema == null){
+
+ if (schema == null || builder.isEmpty()){
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
}
-
+
builder.build(context, container);
sorter = createNewSorter();
sorter.setup(context, getSelectionVector4(), this.container);
sorter.sort(getSelectionVector4(), this.container);
return IterOutcome.OK_NEW_SCHEMA;
-
+
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
kill();
logger.error("Failure during query", ex);
@@ -163,19 +163,19 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-
+
return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
}
-
+
public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
throws ClassTransformationException, IOException, SchemaChangeException{
CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
-
+
for(Ordering od : orderings){
// first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
+ ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
g.setMappingSet(leftMapping);
@@ -183,26 +183,26 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
g.setMappingSet(rightMapping);
HoldingContainer right = g.addExpr(expr, false);
g.setMappingSet(mainMapping);
-
+
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry());
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
+
if(od.getDirection() == Direction.ASCENDING){
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
}
}
-
+
g.getEvalBlock()._return(JExpr.lit(0));
-
+
return context.getImplementationClass(cg);
}
-
+
@Override
public WritableBatch getWritableBatch() {
throw new UnsupportedOperationException("A sort batch is not writable.");
@@ -213,7 +213,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
incoming.kill();
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 8980fdc..bf9db9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
public class SortRecordBatchBuilder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
-
+
private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
private int recordCount;
@@ -47,12 +47,12 @@ public class SortRecordBatchBuilder {
private final long maxBytes;
private SelectionVector4 sv4;
final PreAllocator svAllocator;
-
+
public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){
this.maxBytes = maxBytes;
this.svAllocator = a.getNewPreAllocator();
}
-
+
private long getSize(VectorAccessible batch){
long bytes = 0;
for(VectorWrapper<?> v : batch){
@@ -60,11 +60,11 @@ public class SortRecordBatchBuilder {
}
return bytes;
}
-
+
/**
- * Add another record batch to the set of record batches.
+ * Add another record batch to the set of record batches.
* @param batch
- * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
+ * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
* @throws SchemaChangeException
*/
public boolean add(VectorAccessible batch){
@@ -79,7 +79,7 @@ public class SortRecordBatchBuilder {
if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
-
+
if (batch.getRecordCount() == 0) return true;
RecordBatchData bd = new RecordBatchData(batch);
@@ -112,15 +112,21 @@ public class SortRecordBatchBuilder {
return true;
}
+ public boolean isEmpty(){
+ return batches.isEmpty();
+ }
+
public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{
outputContainer.clear();
if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
- assert batches.keySet().size() > 0;
+ if(batches.keys().size() < 1){
+ assert false : "Invalid to have an empty set of batches with no schemas.";
+ }
sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
-
+
// now we're going to generate the sv4 pointers
switch(schema.getSelectionVectorMode()){
case NONE: {
@@ -150,7 +156,7 @@ public class SortRecordBatchBuilder {
default:
throw new UnsupportedOperationException();
}
-
+
// next, we'll create lists of each of the vector types.
ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
for(RecordBatchData rbd : batches.values()){
@@ -158,12 +164,12 @@ public class SortRecordBatchBuilder {
vectors.put(v.getField(), v);
}
}
-
+
for(MaterializedField f : vectors.keySet()){
List<ValueVector> v = vectors.get(f);
outputContainer.addHyperList(v, false);
}
-
+
outputContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
}
@@ -177,7 +183,7 @@ public class SortRecordBatchBuilder {
}
if(sv4 != null) sv4.clear();
}
-
+
public List<VectorContainer> getHeldRecordBatches() {
ArrayList<VectorContainer> containerList = Lists.newArrayList();
for (BatchSchema bs : batches.keySet()) {
@@ -190,5 +196,5 @@ public class SortRecordBatchBuilder {
batches.clear();
return containerList;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 379fad2..8d3a3e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -29,26 +29,30 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-public class IteratorValidatorBatchIterator implements RecordBatch{
+public class IteratorValidatorBatchIterator implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
private IterOutcome state = IterOutcome.NOT_YET;
private final RecordBatch incoming;
-
- public IteratorValidatorBatchIterator(RecordBatch incoming){
+
+ public IteratorValidatorBatchIterator(RecordBatch incoming) {
this.incoming = incoming;
}
-
- private void validateReadState(){
- switch(state){
+
+ private void validateReadState() {
+ switch (state) {
case OK:
case OK_NEW_SCHEMA:
return;
default:
- throw new IllegalStateException(String.format("You tried to do a batch data read operation when you were in a state of %s. You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", state.name()));
+ throw new IllegalStateException(
+ String
+ .format(
+ "You tried to do a batch data read operation when you were in a state of %s. You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.",
+ state.name()));
}
}
-
+
@Override
public Iterator<VectorWrapper<?>> iterator() {
validateReadState();
@@ -105,10 +109,17 @@ public class IteratorValidatorBatchIterator implements RecordBatch{
public IterOutcome next() {
if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
state = incoming.next();
-
- if ((state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) && incoming.getRecordCount() > MAX_BATCH_SIZE)
- throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
-
+
+ if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
+ BatchSchema schema = incoming.getSchema();
+ if(schema.getFieldCount() == 0){
+ throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
+ }
+ if(incoming.getRecordCount() > MAX_BATCH_SIZE){
+ throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
+ }
+ }
+
return state;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 42835fc..8bb3d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -170,7 +170,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
int count = selector.next();
if(count > 0){
long t = w.elapsed(TimeUnit.MICROSECONDS);
- logger.debug("Took {} us to merge {} records", t, count);
+// logger.debug("Took {} us to merge {} records", t, count);
container.setRecordCount(count);
return IterOutcome.OK;
}else{
@@ -185,7 +185,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
Stopwatch watch = new Stopwatch();
watch.start();
IterOutcome upstream = incoming.next();
- logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
+// logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
switch (upstream) {
case NONE:
break outer;
@@ -215,7 +215,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
Stopwatch w = new Stopwatch();
w.start();
sorter.sort(sv2);
- logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount());
+// logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount());
batchGroups.add(new BatchGroup(new RecordBatchData(incoming).getContainer(), sv2));
batchesSinceLastSpill++;
if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) {
@@ -223,7 +223,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
batchesSinceLastSpill = 0;
}
long t = w.elapsed(TimeUnit.MICROSECONDS);
- logger.debug("Took {} us to sort {} records", t, count);
+// logger.debug("Took {} us to sort {} records", t, count);
break;
default:
throw new UnsupportedOperationException();
@@ -343,11 +343,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
int i = 0;
for (BatchGroup group : batchGroupList) {
- vectors[i++] = group.getValueAccessorById(group.getValueVectorId(new SchemaPath(field.getName(),ExpressionPosition.UNKNOWN)).getFieldId(),
+ vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
field.getValueClass()).getValueVector();
if (group.hasSecond()) {
VectorContainer c = group.getSecondContainer();
- vectors[i++] = c.getValueAccessorById(c.getValueVectorId(new SchemaPath(field.getName(),ExpressionPosition.UNKNOWN)).getFieldId(),
+ vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
field.getValueClass()).getValueVector();
} else {
vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
index 9d02c44..cf3d188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
@@ -40,7 +40,7 @@ import com.google.common.collect.Lists;
/**
*
* Base class for logical and physical Project implemented in Drill
- */
+ */
public abstract class DrillProjectRelBase extends ProjectRelBase implements DrillRelNode {
protected DrillProjectRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
RelDataType rowType) {
@@ -61,7 +61,7 @@ public abstract class DrillProjectRelBase extends ProjectRelBase implements Dril
List<NamedExpression> expressions = Lists.newArrayList();
for (Pair<RexNode, String> pair : projects()) {
LogicalExpression expr = DrillOptiq.toDrill(context, getChild(), pair.left);
- expressions.add(new NamedExpression(expr, new FieldReference("output." + pair.right)));
+ expressions.add(new NamedExpression(expr, new FieldReference(pair.right)));
}
return expressions;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
index ae777cb..d19b7a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
@@ -64,13 +64,13 @@ public class DrillProjectRel extends DrillProjectRelBase implements DrillRel {
}
return builder.build();
}
-
+
public static DrillProjectRel convert(Project project, ConversionContext context) throws InvalidRelException{
RelNode input = context.toRel(project.getInput());
List<RelDataTypeField> fields = Lists.newArrayList();
List<RexNode> exps = Lists.newArrayList();
for(NamedExpression expr : project.getSelections()){
- fields.add(new RelDataTypeFieldImpl(expr.getRef().getPath().toString(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
+ fields.add(new RelDataTypeFieldImpl(expr.getRef().getRootSegment().getPath(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
exps.add(context.toRex(expr.getExpr()));
}
return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index e5cc730..1492a28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -58,12 +58,12 @@ public class DrillRuleSets {
// Add support for WHERE style joins.
PushFilterPastProjectRule.INSTANCE,
PushFilterPastJoinRule.FILTER_ON_JOIN,
- PushJoinThroughJoinRule.RIGHT,
- PushJoinThroughJoinRule.LEFT,
+ PushJoinThroughJoinRule.RIGHT,
+ PushJoinThroughJoinRule.LEFT,
// End supprot for WHERE style joins.
-
+
//Add back rules
-
+
ExpandConversionRule.INSTANCE,
// SwapJoinRule.INSTANCE,
RemoveDistinctRule.INSTANCE,
@@ -80,7 +80,7 @@ public class DrillRuleSets {
// SwapJoinRule.INSTANCE, //
// PushJoinThroughJoinRule.RIGHT, //
// PushJoinThroughJoinRule.LEFT, //
-// PushSortPastProjectRule.INSTANCE, //
+// PushSortPastProjectRule.INSTANCE, //
////////////////////////////////
DrillScanRule.INSTANCE,
@@ -91,10 +91,10 @@ public class DrillRuleSets {
DrillLimitRule.INSTANCE,
DrillSortRule.INSTANCE,
DrillJoinRule.INSTANCE,
- DrillUnionRule.INSTANCE,
+ DrillUnionRule.INSTANCE,
MergeProjectRule.INSTANCE
));
-
+
public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
// DrillScanRule.INSTANCE,
// DrillFilterRule.INSTANCE,
@@ -115,8 +115,9 @@ public class DrillRuleSets {
StreamAggPrule.INSTANCE,
MergeJoinPrule.INSTANCE,
FilterPrule.INSTANCE,
- LimitPrule.INSTANCE,
- PushLimitToTopN.INSTANCE
+ LimitPrule.INSTANCE
+
+// PushLimitToTopN.INSTANCE
// ExpandConversionRule.INSTANCE,
// SwapJoinRule.INSTANCE,
@@ -135,14 +136,14 @@ public class DrillRuleSets {
// SwapJoinRule.INSTANCE, //
// PushJoinThroughJoinRule.RIGHT, //
// PushJoinThroughJoinRule.LEFT, //
-// PushSortPastProjectRule.INSTANCE, //
+// PushSortPastProjectRule.INSTANCE, //
));
-
+
public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
ProjectPrule.INSTANCE
-
+
));
-
+
private static class DrillRuleSet implements RuleSet{
final ImmutableSet<RelOptRule> rules;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
index 9456a81..128ba28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
@@ -25,17 +25,17 @@ import org.apache.drill.common.expression.SchemaPath;
public class ExprHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExprHelper.class);
-
+
private final static String COMPOUND_FAIL_MESSAGE = "The current Optiq based logical plan interpreter does not complicated expressions. For Order By and Filter";
-
+
public static String getAggregateFieldName(FunctionCall c){
List<LogicalExpression> exprs = c.args;
if(exprs.size() != 1) throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
return getFieldName(exprs.iterator().next());
}
-
+
public static String getFieldName(LogicalExpression e){
- if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
+ //if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index 8573fb2..b75fb40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.physical;
import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTrait;
import org.eigenbase.relopt.RelTraitDef;
@@ -30,12 +31,12 @@ public class DrillDistributionTrait implements RelTrait {
public static DrillDistributionTrait SINGLETON = new DrillDistributionTrait(DistributionType.SINGLETON);
public static DrillDistributionTrait RANDOM_DISTRIBUTED = new DrillDistributionTrait(DistributionType.RANDOM_DISTRIBUTED);
public static DrillDistributionTrait ANY = new DrillDistributionTrait(DistributionType.ANY);
-
+
public static DrillDistributionTrait DEFAULT = ANY;
-
- private DistributionType type;
+
+ private DistributionType type;
private final ImmutableList<DistributionField> fields;
-
+
private DrillDistributionTrait(DistributionType type) {
assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
|| type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
@@ -44,11 +45,15 @@ public class DrillDistributionTrait implements RelTrait {
}
public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields) {
- assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
+ assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
this.type = type;
this.fields = fields;
}
+ @Override
+ public void register(RelOptPlanner planner) {
+ }
+
public boolean subsumes(RelTrait trait) {
if (trait instanceof DrillDistributionTrait) {
@@ -65,19 +70,19 @@ public class DrillDistributionTrait implements RelTrait {
assert(thisFields.size() > 0 && requiredFields.size() > 0);
// A subset of the required distribution columns can satisfy (subsume) the requirement
- // e.g: required distribution: {a, b, c}
+ // e.g: required distribution: {a, b, c}
// Following can satisfy the requirements: {a}, {b}, {c}, {a, b}, {b, c}, {a, c} or {a, b, c}
return (requiredFields.containsAll(thisFields));
}
else if (requiredDist == DistributionType.RANDOM_DISTRIBUTED) {
- return true; // hash distribution subsumes random distribution and ANY distribution
+ return true; // hash distribution subsumes random distribution and ANY distribution
}
}
}
return this.equals(trait);
}
-
+
public RelTraitDef<DrillDistributionTrait> getTraitDef() {
return DrillDistributionTraitDef.INSTANCE;
}
@@ -93,7 +98,7 @@ public class DrillDistributionTrait implements RelTrait {
public int hashCode() {
return fields == null ? type.hashCode() : type.hashCode() | fields.hashCode() << 4 ;
}
-
+
public boolean equals(Object obj) {
if (this == obj) {
return true;
@@ -110,13 +115,13 @@ public class DrillDistributionTrait implements RelTrait {
return fields == null ? this.type.toString() : this.type.toString() + "(" + fields + ")";
}
-
+
public static class DistributionField {
/**
* 0-based index of field being DISTRIBUTED.
*/
private final int fieldId;
-
+
public DistributionField (int fieldId) {
this.fieldId = fieldId;
}
@@ -128,18 +133,18 @@ public class DrillDistributionTrait implements RelTrait {
DistributionField other = (DistributionField) obj;
return this.fieldId == other.fieldId;
}
-
+
public int hashCode() {
return this.fieldId;
}
-
+
public int getFieldId() {
return this.fieldId;
}
-
+
public String toString() {
return String.format("[$%s]", this.fieldId);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index f392a18..0fc3abd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -40,19 +40,18 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new FilterPrel(getCluster(), traitSet, sole(inputs), getCondition());
}
-
+
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getChild();
-
+
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
-
- //Currently, Filter only accepts "NONE", SV2, SV4.
-
+
+ //Currently, Filter accepts "NONE", SV2, SV4.
+
Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f);
- creator.addPhysicalOperator(p);
-
+
return p;
}
-
+
}