You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/20 03:18:53 UTC

[45/51] [abbrv] Move to Optiq 0.6 Also includes: -improve exception catching -move schema path parsing to Antlr -close zookeeper connection on if client created -enhance BaseTestQuery and have other query tests utilize it -Various test fixes for better m

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 080679b..30cde91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -50,14 +50,14 @@ import com.google.common.io.Files;
  * methods and fields of the class to merge to the class that is being visited.
  */
 class MergeAdapter extends ClassVisitor {
-  
+
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeAdapter.class);
-  
+
   private ClassNode classToMerge;
   private ClassSet set;
-  
+
   private Set<String> mergingNames = Sets.newHashSet();
-  
+
   private MergeAdapter(ClassSet set, ClassVisitor cv, ClassNode cn) {
     super(Opcodes.ASM4, cv);
     this.classToMerge = cn;
@@ -88,16 +88,16 @@ class MergeAdapter extends ClassVisitor {
     System.out.println("Annotation");
     return super.visitAnnotation(desc, visible);
   }
-  
+
   // visit the class
   public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
     // use the access and names of the impl class.
     if(name.contains("$")){
       super.visit(version, access, name, signature, superName, interfaces);
     }else{
-      super.visit(version, access ^ Modifier.ABSTRACT | Modifier.FINAL, name, signature, superName, interfaces);  
+      super.visit(version, access ^ Modifier.ABSTRACT | Modifier.FINAL, name, signature, superName, interfaces);
     }
-    
+
 //    this.cname = name;
   }
 
@@ -107,8 +107,8 @@ class MergeAdapter extends ClassVisitor {
 
     // skip all abstract methods as they should have implementations.
     if ((access & Modifier.ABSTRACT) != 0 || mergingNames.contains(arg1)) {
-      
-      logger.debug("Skipping copy of '{}()' since it is abstract or listed elsewhere.", arg1);
+
+//      logger.debug("Skipping copy of '{}()' since it is abstract or listed elsewhere.", arg1);
       return null;
     }
     if(arg3 != null){
@@ -156,8 +156,8 @@ class MergeAdapter extends ClassVisitor {
   public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
     return super.visitField(access, name, desc, signature, value);
   }
-  
-  
+
+
   public static class MergedClassResult{
     public byte[] bytes;
     public Collection<String> innerClasses;
@@ -166,32 +166,32 @@ class MergeAdapter extends ClassVisitor {
       this.bytes = bytes;
       this.innerClasses = innerClasses;
     }
-    
-    
+
+
   }
-  
+
   public static MergedClassResult getMergedClass(ClassSet set, byte[] precompiledClass, ClassNode generatedClass) throws IOException{
 
     // Setup adapters for merging, remapping class names and class writing. This is done in reverse order of how they
     // will be evaluated.
-    
+
     ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
     RemapClasses re = new RemapClasses(set);
     ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, re);
     ClassVisitor visitor = remappingAdapter;
     if(generatedClass != null){
-      visitor = new MergeAdapter(set, remappingAdapter, generatedClass);  
+      visitor = new MergeAdapter(set, remappingAdapter, generatedClass);
     }
     ClassReader tReader = new ClassReader(precompiledClass);
     tReader.accept(visitor, ClassReader.EXPAND_FRAMES);
     byte[] outputClass = cw.toByteArray();
-    
+
     // enable when you want all the generated merged class files to also be written to disk.
     //Files.write(outputClass, new File(String.format("/tmp/drill-generated-classes/%s-output.class", set.generated.dot)));
 
     return new MergedClassResult(outputClass, re.getInnerClasses());
   }
-  
+
 
   static class RemapClasses extends Remapper {
 
@@ -208,15 +208,15 @@ class MergeAdapter extends ClassVisitor {
 
     @Override
     public String map(String typeName) {
-      
+
       // remap the names of all classes that start with the old class name.
       if (typeName.startsWith(top.precompiled.slash)) {
-        
+
         // write down all the sub classes.
         if (typeName.startsWith(current.precompiled.slash + "$")){
           innerClasses.add(typeName);
         }
-          
+
         return typeName.replace(top.precompiled.slash, top.generated.slash);
       }
       return typeName;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 4221664..2728759 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -39,7 +39,7 @@ public class Accountor {
   private final long total;
   private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
   private final FragmentHandle handle;
-  
+
   public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) {
     // TODO: fix preallocation stuff
     AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
@@ -106,7 +106,7 @@ public class Accountor {
       }
       sb.append(".\n");
 
-      
+
       Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
       for (DebugStackTrace t : buffers.values()) {
         multi.put(t, t);
@@ -114,7 +114,7 @@ public class Accountor {
 
       for (DebugStackTrace entry : multi.keySet()) {
         Collection<DebugStackTrace> allocs = multi.get(entry);
-        
+
         sb.append("\n\n\tTotal ");
         sb.append(allocs.size());
         sb.append(" allocation(s) of byte size(s): ");
@@ -127,20 +127,21 @@ public class Accountor {
           }
           sb.append(", ");
         }
-        
+
         sb.append("at stack location:\n");
         entry.addToString(sb);
       }
-      
+
       throw new IllegalStateException(sb.toString());
-      
+
     }
 
-    
+    remainder.close();
+
   }
 
   private class DebugStackTrace {
-    
+
     private StackTraceElement[] elements;
     private long size;
     private String desc;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 927e5a2..95e57d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.memory;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * 
- * 
+ *
+ *
  * TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
  */
 public class AtomicRemainder {
@@ -32,6 +32,7 @@ public class AtomicRemainder {
   private final AtomicLong unaccountable;
   private final long max;
   private final long pre;
+  private boolean closed = false;
 
   public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
     this.parent = parent;
@@ -52,7 +53,7 @@ public class AtomicRemainder {
   /**
    * Automatically allocate memory. This is used when an actual allocation happened to be larger than requested. This
    * memory has already been used up so it must be accurately accounted for in future allocations.
-   * 
+   *
    * @param size
    */
   public void forceGet(long size) {
@@ -101,7 +102,7 @@ public class AtomicRemainder {
 
   /**
    * Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
-   * 
+   *
    * @param size
    */
   public void returnAllocation(long size) {
@@ -115,4 +116,10 @@ public class AtomicRemainder {
     }
   }
 
+  public void close(){
+    if(!closed){
+      closed = true;
+//      if(parent != null) parent.returnAllocation(pre);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e8ab192..ddc5025 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.compile.ClassTransformer;
@@ -96,9 +97,11 @@ public class FragmentContext implements Closeable {
   public DrillbitContext getDrillbitContext() {
     return context;
   }
-  
+
   public SchemaPlus getRootSchema(){
-    return context.getStorage().getSchemaFactory().getOrphanedRootSchema();
+    SchemaPlus root = Frameworks.createRootSchema();
+    context.getStorage().getSchemaFactory().registerSchemas(null, root);
+    return root;
   }
 
   /**
@@ -116,7 +119,7 @@ public class FragmentContext implements Closeable {
   public long getQueryStartTime() {
       return this.queryStartTime;
   }
-  
+
   /**
    * The FragmentHandle for this Fragment
    * @return FragmentHandle
@@ -136,10 +139,10 @@ public class FragmentContext implements Closeable {
   public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
     return getImplementationClass(cg.getCodeGenerator());
   }
-  
+
   public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException {
     long t1 = System.nanoTime();
-    
+
     T t = transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(),
         cg.getMaterializedClassName());
     logger.debug("Compile time: {} millis.", (System.nanoTime() - t1) / 1000 / 1000);
@@ -177,7 +180,7 @@ public class FragmentContext implements Closeable {
   public void addDaemonThread(Thread thread) {
     daemonThreads.add(thread);
     thread.start();
-    
+
   }
 
   public IncomingBuffers getBuffers() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
new file mode 100644
index 0000000..7e6ae8e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/Multitimer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.slf4j.Logger;
+
+public class Multitimer<T extends Enum<T>> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Multitimer.class);
+
+  private final long start;
+  private final long[] times;
+  private final Class<T> clazz;
+
+  public Multitimer(Class<T> clazz){
+    this.times = new long[clazz.getEnumConstants().length];
+    this.start = System.nanoTime();
+    this.clazz = clazz;
+  }
+
+  public void mark(T timer){
+    times[timer.ordinal()] = System.nanoTime();
+  }
+
+  public void log(Logger logger){
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 1564b09..17d47aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,6 +19,9 @@ package org.apache.drill.exec.ops;
 
 import java.util.Collection;
 
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.tools.Frameworks;
+
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
@@ -27,28 +30,48 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
+import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
 
 public class QueryContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
-  
-  private QueryId queryId;
-  private DrillbitContext drillbitContext;
-  private WorkEventBus workBus;
-  
-  public QueryContext(QueryId queryId, DrillbitContext drllbitContext) {
+
+  private final QueryId queryId;
+  private final DrillbitContext drillbitContext;
+  private final WorkEventBus workBus;
+  private UserSession session;
+  public final Multitimer<QuerySetup> timer;
+
+  public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext) {
     super();
     this.queryId = queryId;
     this.drillbitContext = drllbitContext;
     this.workBus = drllbitContext.getWorkBus();
+    this.session = session;
+    this.timer = new Multitimer<>(QuerySetup.class);
+  }
+
+  public UserSession getSession(){
+    return session;
   }
-  
+
+  public SchemaPlus getNewDefaultSchema(){
+    SchemaPlus rootSchema = Frameworks.createRootSchema();
+    drillbitContext.getSchemaFactory().registerSchemas(session.getUser(), rootSchema);
+    SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+    if(defaultSchema == null){
+      return rootSchema;
+    }else{
+      return defaultSchema;
+    }
+  }
+
+
   public DrillbitEndpoint getCurrentEndpoint(){
     return drillbitContext.getEndpoint();
   }
-  
+
   public QueryId getQueryId() {
     return queryId;
   }
@@ -56,36 +79,32 @@ public class QueryContext {
   public StoragePluginRegistry getStorage(){
     return drillbitContext.getStorage();
   }
-  
-  
+
+
   public DistributedCache getCache(){
     return drillbitContext.getCache();
   }
-  
+
   public Collection<DrillbitEndpoint> getActiveEndpoints(){
     return drillbitContext.getBits();
   }
-  
+
   public PhysicalPlanReader getPlanReader(){
     return drillbitContext.getPlanReader();
   }
-  
+
   public DataConnectionCreator getDataConnectionsPool(){
     return drillbitContext.getDataConnectionsPool();
   }
-  
+
   public DrillConfig getConfig(){
     return drillbitContext.getConfig();
   }
-  
+
   public WorkEventBus getWorkBus(){
     return workBus;
   }
 
-  public DrillSchemaFactory getFactory(){
-    return drillbitContext.getSchemaFactory();
-  }
-  
   public FunctionImplementationRegistry getFunctionRegistry(){
     return drillbitContext.getFunctionImplementationRegistry();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
new file mode 100644
index 0000000..ef73867
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QuerySetup.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+public enum QuerySetup {
+  START_SQL_PARSING,
+  START_SQL_VALIDATION,
+  START_SQL_TO_REL,
+  START_OPTIQ_REL_TO_DRILL_LOGICAL,
+  START_DRILL_LOGICAL_TO_PHYSICAL;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 6c111bb..38d56ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -18,11 +18,15 @@
 package org.apache.drill.exec.physical.impl;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
 
 public interface OutputMutator {
   public void removeField(MaterializedField field) throws SchemaChangeException;
+  public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException ;
+
+  @Deprecated
   public void addField(ValueVector vector) throws SchemaChangeException ;
   public void removeAllFields();
   public void setNewSchema() throws SchemaChangeException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 24ea9c4..ace2677 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -168,6 +169,15 @@ public class ScanBatch implements RecordBatch {
       ScanBatch.this.schemaChanged = true;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
+      ValueVector v = TypeHelper.getNewVector(field, context.getAllocator());
+      if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+      addField(v);
+      return (T) v;
+    }
+
   }
 
   @Override
@@ -179,7 +189,7 @@ public class ScanBatch implements RecordBatch {
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
   }
-  
+
   public void cleanup(){
     container.clear();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 569bfc7..d2d8d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
 
 public abstract class PriorityQueueTemplate implements PriorityQueue {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueTemplate.class);
-  
+
   private SelectionVector4 heapSv4;//This holds the heap
   private SelectionVector4 finalSv4;//This is for final sorted output
   private ExpandableHyperContainer hyperBatch;
@@ -58,7 +58,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     BatchSchema schema = container.getSchema();
     VectorContainer newContainer = new VectorContainer();
     for (MaterializedField field : schema) {
-      int id = container.getValueVectorId(new SchemaPath(field.getName(), ExpressionPosition.UNKNOWN)).getFieldId();
+      int id = container.getValueVectorId(field.getAsSchemaPath()).getFieldId();
       newContainer.add(container.getValueAccessorById(id, field.getValueClass()).getValueVectors());
     }
     newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
@@ -72,7 +72,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     }
     v4.clear();
   }
-  
+
   @Override
   public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException{
     Stopwatch watch = new Stopwatch();
@@ -184,13 +184,13 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
     siftDown();
     return value;
   }
-  
+
   public void swap(int sv0, int sv1) {
     int tmp = heapSv4.get(sv0);
     heapSv4.set(sv0, heapSv4.get(sv1));
     heapSv4.set(sv1, tmp);
   }
-  
+
   public int compare(int leftIndex, int rightIndex) {
     int sv1 = heapSv4.get(leftIndex);
     int sv2 = heapSv4.get(rightIndex);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 5a7a6fa..775766d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
 
-public abstract class HashTableTemplate implements HashTable { 
+public abstract class HashTableTemplate implements HashTable {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
   private static final boolean EXTRA_DEBUG = false;
@@ -72,7 +72,7 @@ public abstract class HashTableTemplate implements HashTable {
   private int freeIndex = 0;
 
   // Placeholder for the current index while probing the hash table
-  private IntHolder currentIdxHolder; 
+  private IntHolder currentIdxHolder;
 
   private FragmentContext context;
 
@@ -86,7 +86,7 @@ public abstract class HashTableTemplate implements HashTable {
   private RecordBatch outgoing;
 
   // Hash table configuration parameters
-  private HashTableConfig htConfig; 
+  private HashTableConfig htConfig;
 
   // The original container from which others may be cloned
   private VectorContainer htContainerOrig;
@@ -96,15 +96,15 @@ public abstract class HashTableTemplate implements HashTable {
   private int outputCount = 0;
 
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
-  // *unique* records. Thus, suppose there are N incoming record batches, each 
-  // of size BATCH_SIZE..but they have M unique keys altogether, the number of 
+  // *unique* records. Thus, suppose there are N incoming record batches, each
+  // of size BATCH_SIZE..but they have M unique keys altogether, the number of
   // BatchHolders will be (M/BATCH_SIZE) + 1
   public class BatchHolder {
 
     // Container of vectors to hold type-specific keys
     private VectorContainer htContainer;
 
-    // Array of 'link' values 
+    // Array of 'link' values
     private IntVector links;
 
     // Array of hash values - this is useful when resizing the hash table
@@ -122,7 +122,7 @@ public abstract class HashTableTemplate implements HashTable {
           ValueVector vv = TypeHelper.getNewVector(w.getField(), context.getAllocator());
           VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
           htContainer.add(vv);
-        }      
+        }
       }
 
       links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -145,10 +145,10 @@ public abstract class HashTableTemplate implements HashTable {
     }
 
     // Check if the key at the currentIdx position in hash table matches the key
-    // at the incomingRowIdx. if the key does not match, update the 
+    // at the incomingRowIdx. if the key does not match, update the
     // currentIdxHolder with the index of the next link.
-    private boolean isKeyMatch(int incomingRowIdx, 
-                               IntHolder currentIdxHolder, 
+    private boolean isKeyMatch(int incomingRowIdx,
+                               IntHolder currentIdxHolder,
                                boolean isProbe) {
 
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
@@ -156,7 +156,7 @@ public abstract class HashTableTemplate implements HashTable {
 
       if (isProbe)
         match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
-      else 
+      else
         match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch);
 
       if (! match) {
@@ -165,9 +165,9 @@ public abstract class HashTableTemplate implements HashTable {
       return match;
     }
 
-    // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table 
-    // container at the specified index 
-    private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) { 
+    // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
+    // container at the specified index
+    private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
 
       if (! setValue(incomingRowIdx, currentIdxWithinBatch)) {
@@ -179,7 +179,7 @@ public abstract class HashTableTemplate implements HashTable {
         lastEntryBatch.updateLinks(lastEntryIdxWithinBatch, currentIdx);
       }
 
-      // since this is the last entry in the hash chain, the links array at position currentIdx 
+      // since this is the last entry in the hash chain, the links array at position currentIdx
       // will point to a null (empty) slot
       links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
       hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
@@ -204,7 +204,7 @@ public abstract class HashTableTemplate implements HashTable {
       IntVector newHashValues = allocMetadataVector(size, 0);
 
       for (int i = 0; i <= maxOccupiedIdx; i++) {
-        int entryIdxWithinBatch = i; 
+        int entryIdxWithinBatch = i;
         int entryIdx = entryIdxWithinBatch + batchStartIdx;
         int hash = hashValues.getAccessor().get(entryIdxWithinBatch); // get the already saved hash value
         int bucketIdx = getBucketIndex(hash, numbuckets);
@@ -237,35 +237,35 @@ public abstract class HashTableTemplate implements HashTable {
 
         }
 
-      }      
+      }
 
       links.clear();
       hashValues.clear();
-      
+
       links = newLinks;
       hashValues = newHashValues;
     }
-    
+
     private boolean outputKeys() {
 
-      /** for debugging 
+      /** for debugging
       Object tmp = (htContainer).getValueAccessorById(0, BigIntVector.class).getValueVector();
       BigIntVector vv0 = null;
       BigIntHolder holder = null;
 
-      if (tmp != null) { 
+      if (tmp != null) {
         vv0 = ((BigIntVector) tmp);
         holder = new BigIntHolder();
       }
       */
 
-      for (int i = 0; i <= maxOccupiedIdx; i++) { 
+      for (int i = 0; i <= maxOccupiedIdx; i++) {
         if (outputRecordKeys(i, outputCount) ) {
           if (EXTRA_DEBUG) logger.debug("Outputting keys to {}", outputCount) ;
 
-          // debugging 
+          // debugging
           // holder.value = vv0.getAccessor().get(i);
-          // if (holder.value == 100018 || holder.value == 100021) { 
+          // if (holder.value == 100018 || holder.value == 100021) {
           //  logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i, outputCount);
           // }
 
@@ -288,41 +288,41 @@ public abstract class HashTableTemplate implements HashTable {
         }
       }
     }
-    
+
     private void clear() {
       htContainer.clear();;
       links.clear();
       hashValues.clear();
     }
 
-    // These methods will be code-generated 
+    // These methods will be code-generated
 
     @RuntimeOverridden
-    protected void setupInterior(@Named("incomingBuild") RecordBatch incomingBuild, 
+    protected void setupInterior(@Named("incomingBuild") RecordBatch incomingBuild,
                                  @Named("incomingProbe") RecordBatch incomingProbe,
                                  @Named("outgoing") RecordBatch outgoing,
                                  @Named("htContainer") VectorContainer htContainer) {}
 
     @RuntimeOverridden
-    protected boolean isKeyMatchInternalBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;} 
+    protected boolean isKeyMatchInternalBuild(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
 
     @RuntimeOverridden
-    protected boolean isKeyMatchInternalProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;} 
+    protected boolean isKeyMatchInternalProbe(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
 
     @RuntimeOverridden
-    protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;} 
+    protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {return false;}
 
     @RuntimeOverridden
-    protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {return false;} 
+    protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {return false;}
 
   } // class BatchHolder
 
 
   @Override
-  public void setup(HashTableConfig htConfig, FragmentContext context, 
+  public void setup(HashTableConfig htConfig, FragmentContext context,
                     RecordBatch incomingBuild, RecordBatch incomingProbe,
                     RecordBatch outgoing, VectorContainer htContainerOrig) {
-    float loadf = htConfig.getLoadFactor(); 
+    float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
     if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
@@ -345,18 +345,17 @@ public abstract class HashTableTemplate implements HashTable {
 
     threshold = (int) Math.ceil(tableSize * loadf);
 
-    dummyIntField = MaterializedField.create(new SchemaPath("dummy", ExpressionPosition.UNKNOWN),
-                                             Types.required(MinorType.INT));
+    dummyIntField = MaterializedField.create(SchemaPath.getSimplePath("dummy"), Types.required(MinorType.INT));
 
     startIndices = allocMetadataVector(tableSize, EMPTY_SLOT);
 
-    // Create the first batch holder 
+    // Create the first batch holder
     batchHolders = new ArrayList<BatchHolder>();
     addBatchHolder();
 
     doSetup(incomingBuild, incomingProbe);
 
-    currentIdxHolder = new IntHolder();    
+    currentIdxHolder = new IntHolder();
   }
 
   public int numBuckets() {
@@ -399,7 +398,7 @@ public abstract class HashTableTemplate implements HashTable {
   public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
 
     int hash = getHashBuild(incomingRowIdx);
-    int i = getBucketIndex(hash, numBuckets()); 
+    int i = getBucketIndex(hash, numBuckets());
     int startIdx = startIndices.getAccessor().get(i);
     int currentIdx;
     int currentIdxWithinBatch;
@@ -407,9 +406,9 @@ public abstract class HashTableTemplate implements HashTable {
     BatchHolder lastEntryBatch = null;
     int lastEntryIdxWithinBatch = EMPTY_SLOT;
 
-        
+
     if (startIdx == EMPTY_SLOT) {
-      // this is the first entry in this bucket; find the first available slot in the 
+      // this is the first entry in this bucket; find the first available slot in the
       // container of keys and values
       currentIdx = freeIndex++;
       addBatchIfNeeded(currentIdx);
@@ -430,8 +429,8 @@ public abstract class HashTableTemplate implements HashTable {
 
     bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
     currentIdxHolder.value = currentIdx;
-    
-    // if startIdx is non-empty, follow the hash chain links until we find a matching 
+
+    // if startIdx is non-empty, follow the hash chain links until we find a matching
     // key or reach the end of the chain
     while (true) {
       currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
@@ -439,7 +438,7 @@ public abstract class HashTableTemplate implements HashTable {
       if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
         htIdxHolder.value = currentIdxHolder.value;
         found = true;
-        break;        
+        break;
       }
       else if (currentIdxHolder.value == EMPTY_SLOT) {
         lastEntryBatch = bh;
@@ -462,7 +461,7 @@ public abstract class HashTableTemplate implements HashTable {
         htIdxHolder.value = currentIdx;
         return PutStatus.KEY_ADDED;
       }
-      else 
+      else
         return PutStatus.PUT_FAILED;
     }
 
@@ -471,7 +470,7 @@ public abstract class HashTableTemplate implements HashTable {
 
   private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
 
-    // resize hash table if needed and transfer the metadata 
+    // resize hash table if needed and transfer the metadata
     resizeAndRehashIfNeeded(currentIdx);
 
     addBatchIfNeeded(currentIdx);
@@ -495,7 +494,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     if (currentIdx == EMPTY_SLOT)
       return -1;
-    
+
     BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK);
     currentIdxHolder.value = currentIdx;
 
@@ -503,7 +502,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     while (true) {
       if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
-        found = true; 
+        found = true;
         break;
       } else if (currentIdxHolder.value == EMPTY_SLOT) {
         break;
@@ -511,18 +510,18 @@ public abstract class HashTableTemplate implements HashTable {
         bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK);
       }
     }
-   
+
     return found ? currentIdxHolder.value : -1;
   }
 
-  // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied 
+  // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
   // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
-  // the capacity, we will add a new BatchHolder. 
+  // the capacity, we will add a new BatchHolder.
   private BatchHolder addBatchIfNeeded(int currentIdx) {
     int totalBatchSize = batchHolders.size() * BATCH_SIZE;
-    
+
     if (currentIdx >= totalBatchSize) {
-      BatchHolder bh = addBatchHolder(); 
+      BatchHolder bh = addBatchHolder();
       if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
       return bh;
     }
@@ -538,19 +537,19 @@ public abstract class HashTableTemplate implements HashTable {
     return bh;
   }
 
-  // Resize the hash table if needed by creating a new one with double the number of buckets. 
+  // Resize the hash table if needed by creating a new one with double the number of buckets.
   // For each entry in the old hash table, re-hash it to the new table and update the metadata
-  // in the new table.. the metadata consists of the startIndices, links and hashValues. 
-  // Note that the keys stored in the BatchHolders are not moved around. 
+  // in the new table.. the metadata consists of the startIndices, links and hashValues.
+  // Note that the keys stored in the BatchHolders are not moved around.
   private void resizeAndRehashIfNeeded(int currentIdx) {
     if (numEntries < threshold)
       return;
 
     if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
 
-    // If the table size is already MAXIMUM_CAPACITY, don't resize 
-    // the table, but set the threshold to Integer.MAX_VALUE such that 
-    // future attempts to resize will return immediately. 
+    // If the table size is already MAXIMUM_CAPACITY, don't resize
+    // the table, but set the threshold to Integer.MAX_VALUE such that
+    // future attempts to resize will return immediately.
     if (tableSize == MAXIMUM_CAPACITY) {
       threshold = Integer.MAX_VALUE;
       return;
@@ -570,9 +569,9 @@ public abstract class HashTableTemplate implements HashTable {
     for (int i = 0; i < batchHolders.size(); i++) {
       BatchHolder bh = batchHolders.get(i) ;
       int batchStartIdx = i * BATCH_SIZE;
-      bh.rehash(tableSize, newStartIndices, batchStartIdx);  
-    }    
-    
+      bh.rehash(tableSize, newStartIndices, batchStartIdx);
+    }
+
     startIndices.clear();
     startIndices = newStartIndices;
 
@@ -607,11 +606,11 @@ public abstract class HashTableTemplate implements HashTable {
     return vector;
   }
 
-  // These methods will be code-generated in the context of the outer class 
+  // These methods will be code-generated in the context of the outer class
   protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
   protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) ;
   protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ;
 
-} 
+}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 5fc3733..36428ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -84,9 +84,9 @@ import com.sun.codemodel.JExpr;
 public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
 
-  public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, 
+  public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
       ClassGenerator.DEFAULT_SCALAR_MAP);
-  public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null, 
+  public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
       ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   public final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null,
       ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -131,14 +131,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
     this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
     this.minorFragmentSampleCount = cache.getCounter(mapKey);
-    
-    SchemaPath outputPath = new SchemaPath(popConfig.getRef().getPath(), ExpressionPosition.UNKNOWN);
+
+    SchemaPath outputPath = popConfig.getRef();
     MaterializedField outputField = MaterializedField.create(outputPath, Types.required(TypeProtos.MinorType.INT));
     this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, context.getAllocator());
-    
+
   }
 
-  
+
   @Override
   public void cleanup() {
     super.cleanup();
@@ -152,10 +152,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     IterOutcome upstream;
 
     // Start collecting batches until recordsToSample records have been collected
-    
+
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
     builder.add(incoming);
-    
+
     recordsSampled += incoming.getRecordCount();
 
     outer: while (recordsSampled < recordsToSample) {
@@ -211,8 +211,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     containerToCache.clear();
     sampleToSave.clear();
     return true;
-    
-    
+
+
   }
 
   /**
@@ -221,7 +221,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
    * the samples, sorts all the records, builds a partition table, and attempts to push the partition table to the
    * distributed cache. Whichever table gets pushed first becomes the table used by all fragments for partitioning.
-   * 
+   *
    * @return True is successful. False if failed.
    */
   private boolean getPartitionVectors() {
@@ -232,7 +232,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       if (!saveSamples()){
         return false;
       }
-      
+
       VectorAccessibleSerializable finalTable = null;
 
       long val = minorFragmentSampleCount.incrementAndGet();
@@ -282,7 +282,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   private void buildTable() throws SchemaChangeException, ClassTransformationException, IOException {
 
     // Get all samples from distributed map
-    
+
     SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
     for (VectorAccessibleSerializable w : mmap.get(mapKey)) {
       containerBuilder.add(w.get());
@@ -293,7 +293,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     List<Ordering> orderDefs = Lists.newArrayList();
     int i = 0;
     for (Ordering od : popConfig.getOrderings()) {
-      SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
+      SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
       orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
     }
 
@@ -317,7 +317,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
     VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
-    
+
     candidatePartitionTable.clear();
     allSamplesContainer.clear();
     containerBuilder.clear();
@@ -330,7 +330,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
    * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
    * Ordering determines the value of each column. These records will later be sorted based on the values in each
    * column, in the same order as the orderings.
-   * 
+   *
    * @param sv4
    * @param incoming
    * @param outgoing
@@ -348,7 +348,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     int i = 0;
     for (Ordering od : orderings) {
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
-      SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
+      SchemaPath schemaPath = SchemaPath.getSimplePath("f" + i++);
       TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
           .clearMode().setMode(TypeProtos.DataMode.REQUIRED);
       TypeProtos.MajorType newType = builder.build();
@@ -423,9 +423,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
       if (!getPartitionVectors()){
         cleanup();
-        return IterOutcome.STOP;   
+        return IterOutcome.STOP;
       }
-      
+
       batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
       first = false;
 
@@ -497,7 +497,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   /**
    * Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on
    * which partition a record falls into in the partition table
-   * 
+   *
    * @param batch
    * @throws SchemaChangeException
    */
@@ -547,7 +547,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
     try {
       this.projector = context.getImplementationClass(cg);
-      projector.setup(context, batch, this, transfers, partitionVectors, partitions, new SchemaPath(popConfig.getRef().getPath(), ExpressionPosition.UNKNOWN));
+      projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index cd7e632..94fd385 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -98,7 +98,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
 
   /**
    * Send the record batch to the target node, then reset the value vectors
-   * 
+   *
    * @return true if a flush was needed; otherwise false
    * @throws SchemaChangeException
    */
@@ -106,7 +106,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
     final ExecProtos.FragmentHandle handle = context.getHandle();
 
     if (recordCount != 0) {
-      
+
       for(VectorWrapper<?> w : vectorContainer){
         w.getValueVector().getMutator().setValueCount(recordCount);
       }
@@ -147,7 +147,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
     recordCount = 0;
     vectorContainer.zeroVectors();
     for (VectorWrapper<?> v : vectorContainer) {
-      logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
+//      logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush.");
       VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
@@ -173,10 +173,10 @@ public class OutgoingRecordBatch implements VectorAccessible {
       ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
       VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity);
       vectorContainer.add(outgoingVector);
-      logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
+//      logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
     }
     outSchema = bldr.build();
-    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
+//    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
   }
 
   /**
@@ -226,11 +226,11 @@ public class OutgoingRecordBatch implements VectorAccessible {
     return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
   }
 
-  
+
   private StatusHandler statusHandler = new StatusHandler();
   private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
     RpcException ex;
-    
+
     @Override
     public void success(Ack value, ByteBuf buffer) {
       sendCount.decrement();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4342f52..aaee8e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -24,10 +24,10 @@ import java.util.Set;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -58,11 +58,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
 
   private Projector projector;
   private List<ValueVector> allocationVectors;
-  
+
   public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
     super(pop, context, incoming);
   }
-  
+
   @Override
   public int getRecordCount() {
     return incoming.getRecordCount();
@@ -85,32 +85,27 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
   private FieldReference getRef(NamedExpression e){
     FieldReference ref = e.getRef();
     PathSegment seg = ref.getRootSegment();
-    if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
-      return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
-    }
+
+//    if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
+//      return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
+//    }
     return ref;
   }
-  
+
   private boolean isAnyWildcard(List<NamedExpression> exprs){
     for(NamedExpression e : exprs){
       if(isWildcard(e)) return true;
     }
     return false;
   }
-  
+
   private boolean isWildcard(NamedExpression ex){
-    LogicalExpression expr = ex.getExpr();
-    LogicalExpression ref = ex.getRef();
-    if(expr instanceof SchemaPath && ref instanceof SchemaPath){
-      PathSegment e = ((SchemaPath) expr).getRootSegment();
-      PathSegment n = ((SchemaPath) ref).getRootSegment();
-      if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){
-        return true;
-      }
-    }
-    return false;
+    if( !(ex.getExpr() instanceof SchemaPath)) return false;
+    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    NameSegment ref = ex.getRef().getRootSegment();
+    return ref.getPath().equals("*") && expr.getPath().equals("*");
   }
-  
+
   @Override
   protected void setupNewSchema() throws SchemaChangeException{
     this.allocationVectors = Lists.newArrayList();
@@ -118,7 +113,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     final List<NamedExpression> exprs = popConfig.getExprs();
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
-    
+
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
 
     Set<Integer> transferFieldIds = new HashSet();
@@ -128,7 +123,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
     if(isAnyWildcard){
       for(VectorWrapper<?> wrapper : incoming){
         ValueVector vvIn = wrapper.getValueVector();
-        TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName()));
+        String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName();
+        FieldReference ref = new FieldReference(name);
+        TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
         transfers.add(tp);
         container.add(tp.getTo());
       }
@@ -154,7 +151,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
           transfers.add(tp);
           container.add(tp.getTo());
           transferFieldIds.add(vectorRead.getFieldId().getFieldId());
-          logger.debug("Added transfer.");
+//          logger.debug("Added transfer.");
         }else{
           // need to do evaluation.
           ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
@@ -162,15 +159,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
           TypedFieldId fid = container.add(vector);
           ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
           cg.addExpr(write);
-          logger.debug("Added eval.");
+//          logger.debug("Added eval.");
         }
     }
 
-      
+
     }
-    
+
     container.buildSchema(incoming.getSchema().getSelectionVectorMode());
-    
+
     try {
       this.projector = context.getImplementationClass(cg.getCodeGenerator());
       projector.setup(context, incoming, this, transfers);
@@ -178,6 +175,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 5cdab96..29e629a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -59,7 +59,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   private final SortRecordBatchBuilder builder;
   private Sorter sorter;
   private BatchSchema schema;
-  
+
   public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) {
     super(popConfig, context);
     this.incoming = incoming;
@@ -87,8 +87,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     return builder.getSv4();
   }
 
-  
-  
+
+
   @Override
   public void cleanup() {
     super.cleanup();
@@ -105,8 +105,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
         return IterOutcome.NONE;
       }
     }
-    
-    
+
+
     try{
       outer: while (true) {
         IterOutcome upstream = incoming.next();
@@ -134,19 +134,19 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
           throw new UnsupportedOperationException();
         }
       }
-      
-      if (schema == null){
+
+      if (schema == null || builder.isEmpty()){
         // builder may be null at this point if the first incoming batch is empty
         return IterOutcome.NONE;
       }
-        
+
       builder.build(context, container);
       sorter = createNewSorter();
       sorter.setup(context, getSelectionVector4(), this.container);
       sorter.sort(getSelectionVector4(), this.container);
 
       return IterOutcome.OK_NEW_SCHEMA;
-      
+
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       kill();
       logger.error("Failure during query", ex);
@@ -163,19 +163,19 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
     final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
     final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
-    
+
     return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
   }
-  
+
   public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
           throws ClassTransformationException, IOException, SchemaChangeException{
     CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     ClassGenerator<Sorter> g = cg.getRoot();
     g.setMappingSet(mainMapping);
-    
+
     for(Ordering od : orderings){
       // first, we rewrite the evaluation stack for each side of the comparison.
-      ErrorCollector collector = new ErrorCollectorImpl(); 
+      ErrorCollector collector = new ErrorCollectorImpl();
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
       if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       g.setMappingSet(leftMapping);
@@ -183,26 +183,26 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
       g.setMappingSet(rightMapping);
       HoldingContainer right = g.addExpr(expr, false);
       g.setMappingSet(mainMapping);
-      
+
       // next we wrap the two comparison sides and add the expression block for the comparison.
       LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry());
       HoldingContainer out = g.addExpr(fh, false);
       JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-      
+
       if(od.getDirection() == Direction.ASCENDING){
         jc._then()._return(out.getValue());
       }else{
         jc._then()._return(out.getValue().minus());
       }
     }
-    
+
     g.getEvalBlock()._return(JExpr.lit(0));
-    
+
     return context.getImplementationClass(cg);
 
 
   }
-  
+
   @Override
   public WritableBatch getWritableBatch() {
     throw new UnsupportedOperationException("A sort batch is not writable.");
@@ -213,7 +213,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
     incoming.kill();
   }
 
-  
-  
+
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 8980fdc..bf9db9a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
 
 public class SortRecordBatchBuilder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
-  
+
   private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
 
   private int recordCount;
@@ -47,12 +47,12 @@ public class SortRecordBatchBuilder {
   private final long maxBytes;
   private SelectionVector4 sv4;
   final PreAllocator svAllocator;
-  
+
   public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){
     this.maxBytes = maxBytes;
     this.svAllocator = a.getNewPreAllocator();
   }
-  
+
   private long getSize(VectorAccessible batch){
     long bytes = 0;
     for(VectorWrapper<?> v : batch){
@@ -60,11 +60,11 @@ public class SortRecordBatchBuilder {
     }
     return bytes;
   }
-  
+
   /**
-   * Add another record batch to the set of record batches.  
+   * Add another record batch to the set of record batches.
    * @param batch
-   * @return True if the requested add completed successfully.  Returns false in the case that this builder is full and cannot receive additional packages. 
+   * @return True if the requested add completed successfully.  Returns false in the case that this builder is full and cannot receive additional packages.
    * @throws SchemaChangeException
    */
   public boolean add(VectorAccessible batch){
@@ -79,7 +79,7 @@ public class SortRecordBatchBuilder {
     if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
     if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
     if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false;  // sv allocation available.
-      
+
 
     if (batch.getRecordCount() == 0) return true;
     RecordBatchData bd = new RecordBatchData(batch);
@@ -112,15 +112,21 @@ public class SortRecordBatchBuilder {
     return true;
   }
 
+  public boolean isEmpty(){
+    return batches.isEmpty();
+  }
+
   public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{
     outputContainer.clear();
     if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
     if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
-    assert batches.keySet().size() > 0;
+    if(batches.keys().size() < 1){
+      assert false : "Invalid to have an empty set of batches with no schemas.";
+    }
     sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
-    
+
     // now we're going to generate the sv4 pointers
     switch(schema.getSelectionVectorMode()){
     case NONE: {
@@ -150,7 +156,7 @@ public class SortRecordBatchBuilder {
     default:
       throw new UnsupportedOperationException();
     }
-    
+
     // next, we'll create lists of each of the vector types.
     ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
     for(RecordBatchData rbd : batches.values()){
@@ -158,12 +164,12 @@ public class SortRecordBatchBuilder {
         vectors.put(v.getField(), v);
       }
     }
-    
+
     for(MaterializedField f : vectors.keySet()){
       List<ValueVector> v = vectors.get(f);
       outputContainer.addHyperList(v, false);
     }
-    
+
     outputContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
   }
 
@@ -177,7 +183,7 @@ public class SortRecordBatchBuilder {
     }
     if(sv4 != null) sv4.clear();
   }
-  
+
   public List<VectorContainer> getHeldRecordBatches() {
     ArrayList<VectorContainer> containerList = Lists.newArrayList();
     for (BatchSchema bs : batches.keySet()) {
@@ -190,5 +196,5 @@ public class SortRecordBatchBuilder {
     batches.clear();
     return containerList;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 379fad2..8d3a3e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -29,26 +29,30 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-public class IteratorValidatorBatchIterator implements RecordBatch{
+public class IteratorValidatorBatchIterator implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
 
   private IterOutcome state = IterOutcome.NOT_YET;
   private final RecordBatch incoming;
-  
-  public IteratorValidatorBatchIterator(RecordBatch incoming){
+
+  public IteratorValidatorBatchIterator(RecordBatch incoming) {
     this.incoming = incoming;
   }
-  
-  private void validateReadState(){
-    switch(state){
+
+  private void validateReadState() {
+    switch (state) {
     case OK:
     case OK_NEW_SCHEMA:
       return;
     default:
-      throw new IllegalStateException(String.format("You tried to do a batch data read operation when you were in a state of %s.  You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", state.name()));
+      throw new IllegalStateException(
+          String
+              .format(
+                  "You tried to do a batch data read operation when you were in a state of %s.  You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.",
+                  state.name()));
     }
   }
-  
+
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     validateReadState();
@@ -105,10 +109,17 @@ public class IteratorValidatorBatchIterator implements RecordBatch{
   public IterOutcome next() {
     if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
     state = incoming.next();
-    
-    if ((state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) && incoming.getRecordCount() > MAX_BATCH_SIZE)
-      throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d",  incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); 
-    
+
+    if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
+      BatchSchema schema = incoming.getSchema();
+      if(schema.getFieldCount() == 0){
+        throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
+      }
+     if(incoming.getRecordCount() > MAX_BATCH_SIZE){
+       throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d",  incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
+      }
+    }
+
     return state;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 42835fc..8bb3d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -170,7 +170,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         int count = selector.next();
         if(count > 0){
           long t = w.elapsed(TimeUnit.MICROSECONDS);
-          logger.debug("Took {} us to merge {} records", t, count);
+//          logger.debug("Took {} us to merge {} records", t, count);
           container.setRecordCount(count);
           return IterOutcome.OK;
         }else{
@@ -185,7 +185,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         Stopwatch watch = new Stopwatch();
         watch.start();
         IterOutcome upstream = incoming.next();
-        logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
+//        logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
         switch (upstream) {
         case NONE:
           break outer;
@@ -215,7 +215,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           Stopwatch w = new Stopwatch();
           w.start();
           sorter.sort(sv2);
-          logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount());
+//          logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount());
           batchGroups.add(new BatchGroup(new RecordBatchData(incoming).getContainer(), sv2));
           batchesSinceLastSpill++;
           if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) {
@@ -223,7 +223,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             batchesSinceLastSpill = 0;
           }
           long t = w.elapsed(TimeUnit.MICROSECONDS);
-          logger.debug("Took {} us to sort {} records", t, count);
+//          logger.debug("Took {} us to sort {} records", t, count);
           break;
         default:
           throw new UnsupportedOperationException();
@@ -343,11 +343,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
       int i = 0;
       for (BatchGroup group : batchGroupList) {
-        vectors[i++] = group.getValueAccessorById(group.getValueVectorId(new SchemaPath(field.getName(),ExpressionPosition.UNKNOWN)).getFieldId(),
+        vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
                 field.getValueClass()).getValueVector();
         if (group.hasSecond()) {
           VectorContainer c = group.getSecondContainer();
-          vectors[i++] = c.getValueAccessorById(c.getValueVectorId(new SchemaPath(field.getName(),ExpressionPosition.UNKNOWN)).getFieldId(),
+          vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
                   field.getValueClass()).getValueVector();
         } else {
           vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
index 9d02c44..cf3d188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillProjectRelBase.java
@@ -40,7 +40,7 @@ import com.google.common.collect.Lists;
 /**
  *
  * Base class for logical and physical Project implemented in Drill
- */ 
+ */
 public abstract class DrillProjectRelBase extends ProjectRelBase implements DrillRelNode {
   protected DrillProjectRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
       RelDataType rowType) {
@@ -61,7 +61,7 @@ public abstract class DrillProjectRelBase extends ProjectRelBase implements Dril
     List<NamedExpression> expressions = Lists.newArrayList();
     for (Pair<RexNode, String> pair : projects()) {
       LogicalExpression expr = DrillOptiq.toDrill(context, getChild(), pair.left);
-      expressions.add(new NamedExpression(expr, new FieldReference("output." + pair.right)));
+      expressions.add(new NamedExpression(expr, new FieldReference(pair.right)));
     }
     return expressions;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
index ae777cb..d19b7a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
@@ -64,13 +64,13 @@ public class DrillProjectRel extends DrillProjectRelBase implements DrillRel {
     }
     return builder.build();
   }
-  
+
   public static DrillProjectRel convert(Project project, ConversionContext context) throws InvalidRelException{
     RelNode input = context.toRel(project.getInput());
     List<RelDataTypeField> fields = Lists.newArrayList();
     List<RexNode> exps = Lists.newArrayList();
     for(NamedExpression expr : project.getSelections()){
-      fields.add(new RelDataTypeFieldImpl(expr.getRef().getPath().toString(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
+      fields.add(new RelDataTypeFieldImpl(expr.getRef().getRootSegment().getPath(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
       exps.add(context.toRex(expr.getExpr()));
     }
     return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index e5cc730..1492a28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -58,12 +58,12 @@ public class DrillRuleSets {
       // Add support for WHERE style joins.
       PushFilterPastProjectRule.INSTANCE,
       PushFilterPastJoinRule.FILTER_ON_JOIN,
-      PushJoinThroughJoinRule.RIGHT, 
-      PushJoinThroughJoinRule.LEFT, 
+      PushJoinThroughJoinRule.RIGHT,
+      PushJoinThroughJoinRule.LEFT,
       // End supprot for WHERE style joins.
-      
+
       //Add back rules
-      
+
       ExpandConversionRule.INSTANCE,
 //      SwapJoinRule.INSTANCE,
       RemoveDistinctRule.INSTANCE,
@@ -80,7 +80,7 @@ public class DrillRuleSets {
 //      SwapJoinRule.INSTANCE, //
 //      PushJoinThroughJoinRule.RIGHT, //
 //      PushJoinThroughJoinRule.LEFT, //
-//      PushSortPastProjectRule.INSTANCE, //      
+//      PushSortPastProjectRule.INSTANCE, //
 
       ////////////////////////////////
       DrillScanRule.INSTANCE,
@@ -91,10 +91,10 @@ public class DrillRuleSets {
       DrillLimitRule.INSTANCE,
       DrillSortRule.INSTANCE,
       DrillJoinRule.INSTANCE,
-      DrillUnionRule.INSTANCE,      
+      DrillUnionRule.INSTANCE,
       MergeProjectRule.INSTANCE
       ));
-  
+
   public static final RuleSet DRILL_PHYSICAL_MEM = new DrillRuleSet(ImmutableSet.of( //
 //      DrillScanRule.INSTANCE,
 //      DrillFilterRule.INSTANCE,
@@ -115,8 +115,9 @@ public class DrillRuleSets {
       StreamAggPrule.INSTANCE,
       MergeJoinPrule.INSTANCE,
       FilterPrule.INSTANCE,
-      LimitPrule.INSTANCE,
-      PushLimitToTopN.INSTANCE
+      LimitPrule.INSTANCE
+
+//      PushLimitToTopN.INSTANCE
 
 //    ExpandConversionRule.INSTANCE,
 //    SwapJoinRule.INSTANCE,
@@ -135,14 +136,14 @@ public class DrillRuleSets {
 //    SwapJoinRule.INSTANCE, //
 //    PushJoinThroughJoinRule.RIGHT, //
 //    PushJoinThroughJoinRule.LEFT, //
-//    PushSortPastProjectRule.INSTANCE, //      
+//    PushSortPastProjectRule.INSTANCE, //
     ));
-  
+
   public static final RuleSet DRILL_PHYSICAL_DISK = new DrillRuleSet(ImmutableSet.of( //
       ProjectPrule.INSTANCE
-  
+
     ));
-  
+
   private static class DrillRuleSet implements RuleSet{
     final ImmutableSet<RelOptRule> rules;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
index 9456a81..128ba28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
@@ -25,17 +25,17 @@ import org.apache.drill.common.expression.SchemaPath;
 
 public class ExprHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExprHelper.class);
-  
+
   private final static String COMPOUND_FAIL_MESSAGE = "The current Optiq based logical plan interpreter does not complicated expressions.  For Order By and Filter";
-  
+
   public static String getAggregateFieldName(FunctionCall c){
     List<LogicalExpression> exprs = c.args;
     if(exprs.size() != 1) throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
     return getFieldName(exprs.iterator().next());
   }
-  
+
   public static String getFieldName(LogicalExpression e){
-    if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
+    //if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
     throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index 8573fb2..b75fb40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner.physical;
 
 import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.relopt.RelOptPlanner;
 import org.eigenbase.relopt.RelTrait;
 import org.eigenbase.relopt.RelTraitDef;
 
@@ -30,12 +31,12 @@ public class DrillDistributionTrait implements RelTrait {
   public static DrillDistributionTrait SINGLETON = new DrillDistributionTrait(DistributionType.SINGLETON);
   public static DrillDistributionTrait RANDOM_DISTRIBUTED = new DrillDistributionTrait(DistributionType.RANDOM_DISTRIBUTED);
   public static DrillDistributionTrait ANY = new DrillDistributionTrait(DistributionType.ANY);
-  
+
   public static DrillDistributionTrait DEFAULT = ANY;
-  
-  private DistributionType type;  
+
+  private DistributionType type;
   private final ImmutableList<DistributionField> fields;
-  
+
   private DrillDistributionTrait(DistributionType type) {
     assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
             || type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
@@ -44,11 +45,15 @@ public class DrillDistributionTrait implements RelTrait {
   }
 
   public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields) {
-    assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);   
+    assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
     this.type = type;
     this.fields = fields;
   }
 
+  @Override
+  public void register(RelOptPlanner planner) {
+  }
+
   public boolean subsumes(RelTrait trait) {
 
     if (trait instanceof DrillDistributionTrait) {
@@ -65,19 +70,19 @@ public class DrillDistributionTrait implements RelTrait {
           assert(thisFields.size() > 0 && requiredFields.size() > 0);
 
           // A subset of the required distribution columns can satisfy (subsume) the requirement
-          // e.g: required distribution: {a, b, c} 
+          // e.g: required distribution: {a, b, c}
           // Following can satisfy the requirements: {a}, {b}, {c}, {a, b}, {b, c}, {a, c} or {a, b, c}
           return (requiredFields.containsAll(thisFields));
         }
         else if (requiredDist == DistributionType.RANDOM_DISTRIBUTED) {
-          return true; // hash distribution subsumes random distribution and ANY distribution 
+          return true; // hash distribution subsumes random distribution and ANY distribution
         }
       }
     }
 
     return this.equals(trait);
   }
-  
+
   public RelTraitDef<DrillDistributionTrait> getTraitDef() {
     return DrillDistributionTraitDef.INSTANCE;
   }
@@ -93,7 +98,7 @@ public class DrillDistributionTrait implements RelTrait {
   public int hashCode() {
     return  fields == null ? type.hashCode() : type.hashCode() | fields.hashCode() << 4 ;
   }
-  
+
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;
@@ -110,13 +115,13 @@ public class DrillDistributionTrait implements RelTrait {
     return fields == null ? this.type.toString() : this.type.toString() + "(" + fields + ")";
   }
 
-  
+
   public static class DistributionField {
     /**
      * 0-based index of field being DISTRIBUTED.
      */
     private final int fieldId;
-    
+
     public DistributionField (int fieldId) {
       this.fieldId = fieldId;
     }
@@ -128,18 +133,18 @@ public class DrillDistributionTrait implements RelTrait {
       DistributionField other = (DistributionField) obj;
       return this.fieldId == other.fieldId;
     }
-    
+
     public int hashCode() {
       return this.fieldId;
     }
-    
+
     public int getFieldId() {
       return this.fieldId;
     }
-    
+
     public String toString() {
       return String.format("[$%s]", this.fieldId);
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ecaa838f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index f392a18..0fc3abd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -40,19 +40,18 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     return new FilterPrel(getCluster(), traitSet, sole(inputs), getCondition());
   }
-  
+
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getChild();
-    
+
     PhysicalOperator childPOP = child.getPhysicalOperator(creator);
-    
-    //Currently, Filter only accepts "NONE", SV2, SV4. 
-    
+
+    //Currently, Filter accepts "NONE", SV2, SV4.
+
     Filter p = new Filter(childPOP, getFilterExpression(new DrillParseContext()), 1.0f);
-    creator.addPhysicalOperator(p);
-    
+
     return p;
   }
-  
+
 }