You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:45 UTC

[08/24] status changes

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 88bada5..5b61a82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -79,64 +79,69 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public IterOutcome next() {
-
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null) {
-      IterOutcome outcome = incoming.next();
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()){
-          done = true;
-          return IterOutcome.STOP;
+    stats.startProcessing();
+    try{
+      // this is only called on the first batch. Beyond this, the aggregator manages batches.
+      if (aggregator == null) {
+        IterOutcome outcome = next(incoming);
+        logger.debug("Next outcome of {}", outcome);
+        switch (outcome) {
+        case NONE:
+        case NOT_YET:
+        case STOP:
+          return outcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()){
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case OK:
+          throw new IllegalStateException("You should never get a first batch without a new schema");
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", outcome));
         }
-        break;
-      case OK:
-        throw new IllegalStateException("You should never get a first batch without a new schema");
-      default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
       }
-    }
 
-    while(true){
-      AggOutcome out = aggregator.doWork();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch(out){
-      case CLEANUP_AND_RETURN:
-        container.clear();
-        done = true;
-        return aggregator.getOutcome();
-      case RETURN_OUTCOME:
-        return aggregator.getOutcome();
-      case UPDATE_AGGREGATOR:
-        aggregator = null;
-        if(!createAggregator()){
-          return IterOutcome.STOP;
+      while(true){
+        AggOutcome out = aggregator.doWork();
+        logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+        switch(out){
+        case CLEANUP_AND_RETURN:
+          container.clear();
+          done = true;
+          return aggregator.getOutcome();
+        case RETURN_OUTCOME:
+          return aggregator.getOutcome();
+        case UPDATE_AGGREGATOR:
+          aggregator = null;
+          if(!createAggregator()){
+            return IterOutcome.STOP;
+          }
+          continue;
+        default:
+          throw new IllegalStateException(String.format("Unknown state %s.", out));
         }
-        continue;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
       }
+    }finally{
+      stats.stopProcessing();
     }
-    
   }
 
-  
-  
+
+
   /**
    * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
    * and informing the context of the failure state, as well is informing the upstream operators.
-   * 
+   *
    * @return true if the aggregator was setup successfully. false if there was a failure.
    */
   private boolean createAggregator() {
     logger.debug("Creating new aggregator.");
     try{
+      stats.startSetup();
       this.aggregator = createAggregatorInternal();
+      stats.stopSetup();
       return true;
     }catch(SchemaChangeException | ClassTransformationException | IOException ex){
       context.fail(ex);
@@ -153,13 +158,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     container.clear();
     List<VectorAllocator> allocators = Lists.newArrayList();
-    
+
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().length];
     LogicalExpression[] valueExprs = new LogicalExpression[popConfig.getExprs().length];
     TypedFieldId[] keyOutputIds = new TypedFieldId[popConfig.getKeys().length];
-    
+
     ErrorCollector collector = new ErrorCollectorImpl();
-    
+
     for(int i =0; i < keyExprs.length; i++){
       NamedExpression ne = popConfig.getKeys()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
@@ -170,38 +175,38 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       keyOutputIds[i] = container.add(vector);
     }
-    
+
     for(int i =0; i < valueExprs.length; i++){
       NamedExpression ne = popConfig.getExprs()[i];
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
       if(expr == null) continue;
-      
+
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
       allocators.add(VectorAllocator.getAllocator(vector, 50));
       TypedFieldId id = container.add(vector);
       valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
     }
-    
+
     if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
-    
+
     setupIsSame(cg, keyExprs);
     setupIsSameApart(cg, keyExprs);
     addRecordValues(cg, valueExprs);
     outputRecordKeys(cg, keyOutputIds, keyExprs);
     outputRecordKeysPrev(cg, keyOutputIds, keyExprs);
-    
+
     cg.getBlock("resetValues")._return(JExpr.TRUE);
     getIndex(cg);
-    
+
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
     agg.setup(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
     return agg;
   }
-  
-  
-  
+
+
+
   private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
   private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
   private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
@@ -214,19 +219,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       HoldingContainer first = cg.addExpr(expr, false);
       cg.setMappingSet(IS_SAME_I2);
       HoldingContainer second = cg.addExpr(expr, false);
-      
+
       LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
       HoldingContainer out = cg.addExpr(fh, false);
       cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
     cg.getEvalBlock()._return(JExpr.TRUE);
   }
-  
+
   private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); // the internal batch changes each time so we need to redo setup.
   private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSamePrev", null, null);
   private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
   private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
-  
+
   private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
     cg.setMappingSet(ISA_B1);
     for(LogicalExpression expr : keyExprs){
@@ -242,11 +247,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }
     cg.getEvalBlock()._return(JExpr.TRUE);
   }
-  
+
   private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null);
   private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
   private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
-  
+
   private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){
     cg.setMappingSet(EVAL);
     for(LogicalExpression ex : valueExprs){
@@ -255,9 +260,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }
     cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
-  
+
   private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
-  
+
   private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
     cg.setMappingSet(RECORD_KEYS);
     for(int i =0; i < keyExprs.length; i++){
@@ -266,13 +271,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }
     cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
-  
+
   private final GeneratorMapping PREVIOUS_KEYS_OUT = GeneratorMapping.create("setupInterior", "outputRecordKeysPrev", null, null);
   private final MappingSet RECORD_KEYS_PREV_OUT = new MappingSet("previousIndex", "outIndex", "previous", "outgoing", PREVIOUS_KEYS_OUT, PREVIOUS_KEYS_OUT);
 
   private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null);
   private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
-  
+
   private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
     cg.setMappingSet(RECORD_KEYS_PREV);
 
@@ -285,11 +290,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       cg.setMappingSet(RECORD_KEYS_PREV_OUT);
       HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false);
       cg.getBlock(BlockType.EVAL)._if(outerExpression.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
-      
+
     }
     cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
-  
+
   private void getIndex(ClassGenerator<StreamingAggregator> g){
     switch(incoming.getSchema().getSelectionVectorMode()){
     case FOUR_BYTE: {
@@ -308,12 +313,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       g.getBlock("getVectorIndex")._return(var.invoke("getIndex").arg(JExpr.direct("recordIndex")));;
       return;
     }
-     
+
     default:
       throw new IllegalStateException();
-      
+
     }
-   
+
   }
 
   @Override
@@ -321,7 +326,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     super.cleanup();
     incoming.cleanup();
   }
-  
+
   @Override
   protected void killIncoming() {
     incoming.kill();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5eec3bb..6b768c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,25 +17,16 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.record.*;
-import org.eigenbase.rel.JoinRelType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
-import com.sun.codemodel.JExpr;
-
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -44,6 +35,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
@@ -51,14 +43,29 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.JoinRelType;
+
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
 
 public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
 
+    private static final int LEFT_INPUT = 0;
+    private static final int RIGHT_INPUT = 1;
+
     // Probe side record batch
     private final RecordBatch left;
 
@@ -137,7 +144,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
     @Override
     public IterOutcome next() {
-
+        stats.startProcessing();
         try {
             /* If we are here for the first time, execute the build phase of the
              * hash join and setup the run time generated class for the probe side
@@ -153,7 +160,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                  * as well, for the materialization to be successful. This batch will not be used
                  * till we complete the build phase.
                  */
-                leftUpstream = left.next();
+                leftUpstream = next(LEFT_INPUT, left);
 
                 // Build the hash table, using the build side record batches.
                 executeBuildPhase();
@@ -200,12 +207,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                     for (VectorWrapper<?> wrapper : left) {
                       wrapper.getValueVector().clear();
                     }
-                    leftUpstream = left.next();
+                    leftUpstream = next(LEFT_INPUT, left);
                     while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
                       for (VectorWrapper<?> wrapper : left) {
                         wrapper.getValueVector().clear();
                       }
-                      leftUpstream = left.next();
+                      leftUpstream = next(LEFT_INPUT, left);
                     }
                 }
             }
@@ -214,6 +221,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             return IterOutcome.NONE;
 
         } catch (ClassTransformationException | SchemaChangeException | IOException e) {
+            stats.stopProcessing();
             context.fail(e);
             killIncoming();
             return IterOutcome.STOP;
@@ -256,7 +264,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
 
         //Setup the underlying hash table
-        IterOutcome rightUpstream = right.next();
+        IterOutcome rightUpstream = next(RIGHT_INPUT, right);
 
         boolean moreData = true;
 
@@ -323,7 +331,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                     break;
             }
             // Get the next record batch
-            rightUpstream = right.next();
+            rightUpstream = next(RIGHT_INPUT, right);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index c07878a..faca32a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -35,6 +35,9 @@ public final class JoinStatus {
     INCOMING, SV4;
   }
 
+  private static final int LEFT_INPUT = 0;
+  private static final int RIGHT_INPUT = 1;
+
   public final RecordBatch left;
   private int leftPosition;
   private IterOutcome lastLeft;
@@ -63,10 +66,18 @@ public final class JoinStatus {
     this.joinType = output.getJoinType();
   }
 
+  private final IterOutcome nextLeft(){
+    return outputBatch.next(LEFT_INPUT, left);
+  }
+
+  private final IterOutcome nextRight(){
+    return outputBatch.next(RIGHT_INPUT, right);
+  }
+
   public final void ensureInitial(){
     if(!initialSet){
-      this.lastLeft = left.next();
-      this.lastRight = right.next();
+      this.lastLeft = nextLeft();
+      this.lastRight = nextRight();
       initialSet = true;
     }
   }
@@ -148,7 +159,7 @@ public final class JoinStatus {
     if (!isLeftPositionInCurrentBatch()) {
       leftPosition = 0;
       releaseData(left);
-      lastLeft = left.next();
+      lastLeft = nextLeft();
       return lastLeft == IterOutcome.OK;
     }
     lastLeft = IterOutcome.OK;
@@ -167,7 +178,7 @@ public final class JoinStatus {
     if (!isRightPositionInCurrentBatch()) {
       rightPosition = 0;
       releaseData(right);
-      lastRight = right.next();
+      lastRight = nextRight();
       return lastRight == IterOutcome.OK;
     }
     lastRight = IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 48b7fea..46dea64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -66,9 +66,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-    
+
   public final MappingSet setupMapping =
-      new MappingSet("null", "null", 
+      new MappingSet("null", "null",
                      GM("doSetup", "doSetup", null, null),
                      GM("doSetup", "doSetup", null, null));
   public final MappingSet copyLeftMapping =
@@ -96,7 +96,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
                      GM("doSetup", "doCompareNextLeftKey", null, null),
                      GM("doSetup", "doCompareNextLeftKey", null, null));
 
-  
+
   private final RecordBatch left;
   private final RecordBatch right;
   private final JoinStatus status;
@@ -104,7 +104,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private final JoinRelType joinType;
   private JoinWorker worker;
   public MergeJoinBatchBuilder batchBuilder;
-  
+
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
     super(popConfig, context);
 
@@ -130,10 +130,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   @Override
   public IterOutcome next() {
-    
+
     // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
     status.ensureInitial();
-    
+
     // loop so we can start over again if we find a new batch was created.
     while(true){
 
@@ -153,14 +153,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         logger.debug("NO MORE DATA; returning {}  NONE");
         return IterOutcome.NONE;
       }
-      
+
       boolean first = false;
       if(worker == null){
         try {
           logger.debug("Creating New Worker");
+          stats.startSetup();
           this.worker = generateNewWorker();
           first = true;
+          stats.stopSetup();
         } catch (ClassTransformationException | IOException | SchemaChangeException e) {
+          stats.stopSetup();
           context.fail(new SchemaChangeException(e));
           kill();
           return IterOutcome.STOP;
@@ -222,13 +225,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       left.cleanup();
       right.cleanup();
   }
-  
-  private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, 
+
+  private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
       JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException {
     boolean nextLeftIndexDeclared = false;
 
     cg.setMappingSet(compareLeftMapping);
-    
+
     for (JoinCondition condition : conditions) {
       final LogicalExpression leftFieldExpr = condition.getLeft();
 
@@ -242,52 +245,52 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       ////////////////////////////////
       cg.setMappingSet(compareLeftMapping);
       cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
-  
+
       if (!nextLeftIndexDeclared) {
         // int nextLeftIndex = leftIndex + 1;
         cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
         nextLeftIndexDeclared = true;
-      } 
+      }
       // check if the next key is in this batch
       cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
                        ._then()
                          ._return(JExpr.lit(-1));
-  
+
       // generate VV read expressions
       ClassGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
       cg.setMappingSet(compareNextLeftMapping); // change mapping from 'leftIndex' to 'nextLeftIndex'
       ClassGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
-  
+
       if (compareThisLeftExprHolder.isOptional()) {
         // handle null == null
         cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
                               .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
                          ._then()
                            ._return(JExpr.lit(0));
-    
+
         // handle null == !null
         cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
                               .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
                          ._then()
                            ._return(JExpr.lit(1));
       }
-  
+
       // check value equality
-  
+
       LogicalExpression gh = FunctionGenerationHelper.getComparator(compareThisLeftExprHolder,
         compareNextLeftExprHolder,
         context.getFunctionRegistry());
       HoldingContainer out = cg.addExpr(gh, false);
-      
-      //If not 0, it means not equal. We return this out value. 
+
+      //If not 0, it means not equal. We return this out value.
       JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
       jc._then()._return(out.getValue());
     }
-    
+
     //Pass the equality check for all the join conditions. Finally, return 0.
     cg.getEvalBlock()._return(JExpr.lit(0));
-  }  
-  
+  }
+
   private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
 
     final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -322,11 +325,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     //generate doCompare() method
     /////////////////////////////////////////
     generateDoCompare(cg, incomingRecordBatch, incomingLeftRecordBatch, incomingRightRecordBatch, collector);
-    
+
     //generate doCompareNextLeftKey() method
     /////////////////////////////////////////
     generateDoCompareNextLeft(cg, incomingRecordBatch, incomingLeftRecordBatch, joinStatus, collector);
-    
+
     // generate copyLeft()
     //////////////////////
     cg.setMappingSet(copyLeftMapping);
@@ -394,12 +397,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private void allocateBatch() {
     // allocate new batch space.
     container.clear();
-    
+
     //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
     int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0;
     int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() : 0;
     int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE);
-    
+
     // add fields from both batches
     if (leftCount > 0) {
 
@@ -436,11 +439,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     logger.debug("Built joined schema: {}", container.getSchema());
   }
 
-  private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, 
+  private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
       JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
-    
+
     cg.setMappingSet(compareMapping);
-    
+
     for (JoinCondition condition : conditions) {
       final LogicalExpression leftFieldExpr = condition.getLeft();
       final LogicalExpression rightFieldExpr = condition.getRight();
@@ -482,37 +485,37 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
             .cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
             ._then()
             ._return(JExpr.lit(0));
-    
+
         // handle null == !null
         cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0))
             .cor(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))))
             ._then()
             ._return(JExpr.lit(1));
-  
+
       } else if (compareLeftExprHolder.isOptional()) {
         // handle null == required (null is less than any value)
         cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)))
             ._then()
             ._return(JExpr.lit(-1));
-  
+
       } else if (compareRightExprHolder.isOptional()) {
         // handle required == null (null is less than any value)
         cg.getEvalBlock()._if(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))
             ._then()
             ._return(JExpr.lit(1));
       }
-  
+
       LogicalExpression fh = FunctionGenerationHelper.getComparator(compareLeftExprHolder,
         compareRightExprHolder,
-        context.getFunctionRegistry()); 
+        context.getFunctionRegistry());
       HoldingContainer out = cg.addExpr(fh, false);
-      
-      //If not 0, it means not equal. We return this out value.       
+
+      //If not 0, it means not equal. We return this out value.
       JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
       jc._then()._return(out.getValue());
     }
-    
-    //Pass the equality check for all the join conditions. Finally, return 0.    
-    cg.getEvalBlock()._return(JExpr.lit(0));  
+
+    //Pass the equality check for all the join conditions. Finally, return 0.
+    cg.getEvalBlock()._return(JExpr.lit(0));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 3d496d3..cc38cbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -27,8 +27,6 @@ import java.util.PriorityQueue;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.Order.Ordering;
@@ -41,19 +39,28 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JArray;
 import com.sun.codemodel.JClass;
@@ -92,6 +99,15 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private List<VectorAllocator> allocators;
   private MergingReceiverPOP config;
 
+  public static enum Metric implements MetricDef{
+    NEXT_WAIT_NANOS;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
   public MergingRecordBatch(FragmentContext context,
                             MergingReceiverPOP config,
                             RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
@@ -104,8 +120,21 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.outgoingContainer = new VectorContainer();
   }
 
+  private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
+    long startNext = System.nanoTime();
+    RawFragmentBatch b = provider.getNext();
+    if(b != null){
+      stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+    }
+    stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext);
+    return b;
+  }
+
   @Override
   public IterOutcome next() {
+    stats.startProcessing();
+    try{
+
     if (fragProviders.length == 0) return IterOutcome.NONE;
     boolean schemaChanged = false;
 
@@ -131,7 +160,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       for (RawFragmentBatchProvider provider : fragProviders) {
         RawFragmentBatch rawBatch = null;
         try {
-          rawBatch = provider.getNext();
+          rawBatch = getNext(provider);
         } catch (IOException e) {
           context.fail(e);
           return IterOutcome.STOP;
@@ -238,10 +267,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         // reached the end of an incoming record batch
         RawFragmentBatch nextBatch = null;
         try {
-          nextBatch = fragProviders[node.batchId].getNext();
+          nextBatch = getNext(fragProviders[node.batchId]);
 
           while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
-            nextBatch = fragProviders[node.batchId].getNext();
+            nextBatch = getNext(fragProviders[node.batchId]);
           }
         } catch (IOException e) {
           context.fail(e);
@@ -301,6 +330,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       return IterOutcome.OK_NEW_SCHEMA;
     else
       return IterOutcome.OK;
+
+    }finally{
+      stats.stopProcessing();
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 4641de6..f105363 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -166,7 +166,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     recordsSampled += incoming.getRecordCount();
 
     outer: while (recordsSampled < recordsToSample) {
-      upstream = incoming.next();
+      upstream = next(incoming);
       switch (upstream) {
       case NONE:
       case NOT_YET:
@@ -414,97 +414,102 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
   @Override
   public IterOutcome next() {
-    container.zeroVectors();
-
-    // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
-    // done
-    if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
-      return IterOutcome.NONE;
-
-    // if there are batches on the queue, process them first, rather than calling incoming.next()
-    if (batchQueue != null && batchQueue.size() > 0) {
-      VectorContainer vc = batchQueue.poll();
-      recordCount = vc.getRecordCount();
-      try {
-
-        // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
-        setupNewSchema(vc);
-      } catch (SchemaChangeException ex) {
-        kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
+    stats.startProcessing();
+    try{
+      container.zeroVectors();
+
+      // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
+      // done
+      if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
+        return IterOutcome.NONE;
+
+      // if there are batches on the queue, process them first, rather than calling incoming.next()
+      if (batchQueue != null && batchQueue.size() > 0) {
+        VectorContainer vc = batchQueue.poll();
+        recordCount = vc.getRecordCount();
+        try {
+
+          // Must set up a new schema each time, because ValueVectors are not reused between containers in queue
+          setupNewSchema(vc);
+        } catch (SchemaChangeException ex) {
+          kill();
+          logger.error("Failure during query", ex);
+          context.fail(ex);
+          return IterOutcome.STOP;
+        }
+        doWork(vc);
+        vc.zeroVectors();
+        return IterOutcome.OK_NEW_SCHEMA;
       }
-      doWork(vc);
-      vc.zeroVectors();
-      return IterOutcome.OK_NEW_SCHEMA;
-    }
 
-    // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are
-    // more incoming
-    IterOutcome upstream = incoming.next();
+      // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are
+      // more incoming
+      IterOutcome upstream = next(incoming);
 
-    if (this.first && upstream == IterOutcome.OK) {
-      throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
-    }
-
-    // If this is the first iteration, we need to generate the partition vectors before we can proceed
-    if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
-      if (!getPartitionVectors()){
-        cleanup();
-        return IterOutcome.STOP;
+      if (this.first && upstream == IterOutcome.OK) {
+        throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
       }
 
-      batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
-      first = false;
-
-      // Now that we have the partition vectors, we immediately process the first batch on the queue
-      VectorContainer vc = batchQueue.poll();
-      try {
-        setupNewSchema(vc);
-      } catch (SchemaChangeException ex) {
-        kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
+      // If this is the first iteration, we need to generate the partition vectors before we can proceed
+      if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
+        if (!getPartitionVectors()){
+          cleanup();
+          return IterOutcome.STOP;
+        }
+
+        batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
+        first = false;
+
+        // Now that we have the partition vectors, we immediately process the first batch on the queue
+        VectorContainer vc = batchQueue.poll();
+        try {
+          setupNewSchema(vc);
+        } catch (SchemaChangeException ex) {
+          kill();
+          logger.error("Failure during query", ex);
+          context.fail(ex);
+          return IterOutcome.STOP;
+        }
+        doWork(vc);
+        vc.zeroVectors();
+        recordCount = vc.getRecordCount();
+        return IterOutcome.OK_NEW_SCHEMA;
       }
-      doWork(vc);
-      vc.zeroVectors();
-      recordCount = vc.getRecordCount();
-      return IterOutcome.OK_NEW_SCHEMA;
-    }
 
-    // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the
-    // first one
-    // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
-    if (this.startedUnsampledBatches == false) {
-      this.startedUnsampledBatches = true;
-      if (upstream == IterOutcome.OK)
-        upstream = IterOutcome.OK_NEW_SCHEMA;
-    }
-    switch (upstream) {
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      cleanup();
-      recordCount = 0;
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try {
-        setupNewSchema(incoming);
-      } catch (SchemaChangeException ex) {
-        kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
+      // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the
+      // first one
+      // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
+      if (this.startedUnsampledBatches == false) {
+        this.startedUnsampledBatches = true;
+        if (upstream == IterOutcome.OK)
+          upstream = IterOutcome.OK_NEW_SCHEMA;
+      }
+      switch (upstream) {
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        cleanup();
+        recordCount = 0;
+        return upstream;
+      case OK_NEW_SCHEMA:
+        try {
+          setupNewSchema(incoming);
+        } catch (SchemaChangeException ex) {
+          kill();
+          logger.error("Failure during query", ex);
+          context.fail(ex);
+          return IterOutcome.STOP;
+        }
+        // fall through.
+      case OK:
+        doWork(incoming);
+        recordCount = incoming.getRecordCount();
+        return upstream; // change if upstream changed, otherwise normal.
+      default:
+        throw new UnsupportedOperationException();
       }
-      // fall through.
-    case OK:
-      doWork(incoming);
-      recordCount = incoming.getRecordCount();
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+    }finally{
+      stats.stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 844d6db..b6b4c33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,11 +20,10 @@ package org.apache.drill.exec.record;
 import java.util.Iterator;
 
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -36,12 +35,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final T popConfig;
   protected final FragmentContext context;
   protected final OperatorContext oContext;
+  protected final OperatorStats stats;
 
   protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
     super();
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = new OperatorContext(popConfig, context);
+    this.stats = oContext.getStats();
   }
 
   @Override
@@ -58,6 +59,27 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return popConfig;
   }
 
+  public final IterOutcome next(RecordBatch b){
+    return next(0, b);
+  }
+
+  public final IterOutcome next(int inputIndex, RecordBatch b){
+    stats.stopProcessing();
+    IterOutcome next = b.next();
+
+    switch(next){
+    case OK_NEW_SCHEMA:
+      stats.batchReceived(inputIndex, b.getRecordCount(), true);
+      break;
+    case OK:
+      stats.batchReceived(inputIndex, b.getRecordCount(), false);
+      break;
+    }
+
+    stats.startProcessing();
+    return next;
+  }
+
   @Override
   public BatchSchema getSchema() {
     return container.getSchema();
@@ -75,6 +97,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     oContext.close();
   }
 
+
   @Override
   public SelectionVector2 getSelectionVector2() {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index dd2cfe0..13e4ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -29,7 +29,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   protected final RecordBatch incoming;
   private boolean first = true;
   protected boolean outOfMemory = false;
-  
+
   public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
@@ -42,36 +42,46 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
 
   @Override
   public IterOutcome next() {
-    IterOutcome upstream = incoming.next();
-    if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
-    first = false;
-    switch(upstream){
-    case NONE:
-    case NOT_YET:
-    case STOP:
-      return upstream;
-    case OUT_OF_MEMORY:
-      return upstream;
-    case OK_NEW_SCHEMA:
-      try{
-        setupNewSchema();
-      }catch(SchemaChangeException ex){
-        kill();
-        logger.error("Failure during query", ex);
-        context.fail(ex);
-        return IterOutcome.STOP;
-      }
-      // fall through.
-    case OK:
-      doWork();
-      if (outOfMemory) {
-        outOfMemory = false;
-        return IterOutcome.OUT_OF_MEMORY;
+    try{
+      stats.startProcessing();
+      IterOutcome upstream = next(incoming);
+      if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
+      first = false;
+      switch(upstream){
+      case NONE:
+      case NOT_YET:
+      case STOP:
+        return upstream;
+      case OUT_OF_MEMORY:
+        return upstream;
+      case OK_NEW_SCHEMA:
+        try{
+          stats.startSetup();
+          setupNewSchema();
+          stats.stopSetup();
+        }catch(SchemaChangeException ex){
+          stats.stopSetup();
+          kill();
+          logger.error("Failure during query", ex);
+          context.fail(ex);
+          return IterOutcome.STOP;
+        }
+        // fall through.
+      case OK:
+        doWork();
+        if (outOfMemory) {
+          outOfMemory = false;
+          return IterOutcome.OUT_OF_MEMORY;
+        }
+        return upstream; // change if upstream changed, otherwise normal.
+      default:
+        throw new UnsupportedOperationException();
       }
-      return upstream; // change if upstream changed, otherwise normal.
-    default:
-      throw new UnsupportedOperationException();
+    }finally{
+      stats.stopProcessing();
     }
+
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 7f607a3..7297dc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,7 +22,6 @@ import java.io.Closeable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMultiMap;
 import org.apache.drill.exec.cache.HazelCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
@@ -31,6 +30,8 @@ import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.service.ServiceEngine;
 import org.apache.drill.exec.work.WorkManager;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletHandler;
 
 import com.google.common.io.Closeables;
 
@@ -71,6 +72,7 @@ public class Drillbit implements Closeable{
   final DistributedCache cache;
   final WorkManager manager;
   final BootStrapContext context;
+  final Server embeddedJetty;
 
   private volatile RegistrationHandle handle;
 
@@ -89,8 +91,17 @@ public class Drillbit implements Closeable{
       this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler());
       this.cache = new HazelCache(config, context.getAllocator());
     }
+    this.embeddedJetty = new Server(474747);
   }
 
+  private void setupJetty(){
+    ServletHandler handler = new ServletHandler();
+    embeddedJetty.setHandler(handler);
+
+  }
+
+
+
   public void run() throws Exception {
     coord.start(10000);
     DrillbitEndpoint md = engine.start();
@@ -99,6 +110,7 @@ public class Drillbit implements Closeable{
     manager.getContext().getStorage().init();
     manager.getContext().getOptionManager().init();
     handle = coord.register(md);
+    embeddedJetty.start();
   }
 
   public void close() {
@@ -109,7 +121,11 @@ public class Drillbit implements Closeable{
     } catch (InterruptedException e) {
       logger.warn("Interrupted while sleeping during coordination deregistration.");
     }
-
+    try {
+      embeddedJetty.stop();
+    } catch (Exception e) {
+      logger.warn("Failure while shutting down embedded jetty server.");
+    }
     Closeables.closeQuietly(engine);
     Closeables.closeQuietly(coord);
     Closeables.closeQuietly(manager);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 2965e79..718da23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
@@ -65,7 +66,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   private final String name;
   protected final CompressionCodecFactory codecFactory;
   private final boolean compressible;
-  
+
   protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
                              T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
     this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
@@ -80,7 +81,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     this.name = name == null ? defaultName : name;
     this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
   }
-  
+
   @Override
   public DrillFileSystem getFileSystem() {
     return fs;
@@ -90,7 +91,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   public DrillbitContext getContext() {
     return context;
   }
-  
+
   @Override
   public String getName() {
     return name;
@@ -99,7 +100,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   /**
    * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
    * only split on file boundaries.
-   * 
+   *
    * @return True if splittable.
    */
   public boolean isBlockSplittable() {
@@ -184,7 +185,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
     return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
   }
-  
+
   @Override
   public FormatPluginConfig getConfig() {
     return formatConfig;
@@ -214,5 +215,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {
     return ImmutableSet.of();
   }
-  
+
+  public abstract int getReaderOperatorType();
+  public abstract int getWriterOperatorType();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 0b3fe0f..5f9226e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
 import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
@@ -45,13 +46,13 @@ public class EasySubScan extends AbstractSubScan{
   private final EasyFormatPlugin<?> formatPlugin;
   private final List<SchemaPath> columns;
   private String selectionRoot;
-  
+
   @JsonCreator
   public EasySubScan(
       @JsonProperty("files") List<FileWorkImpl> files, //
       @JsonProperty("storage") StoragePluginConfig storageConfig, //
       @JsonProperty("format") FormatPluginConfig formatConfig, //
-      @JacksonInject StoragePluginRegistry engineRegistry, // 
+      @JacksonInject StoragePluginRegistry engineRegistry, //
       @JsonProperty("columns") List<SchemaPath> columns, //
       @JsonProperty("selectionRoot") String selectionRoot
       ) throws IOException, ExecutionSetupException {
@@ -62,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{
     this.columns = columns;
     this.selectionRoot = selectionRoot;
   }
-  
+
   public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, String selectionRoot){
     this.formatPlugin = plugin;
     this.files = files;
@@ -74,7 +75,7 @@ public class EasySubScan extends AbstractSubScan{
   public String getSelectionRoot() {
     return selectionRoot;
   }
-  
+
   @JsonIgnore
   public EasyFormatPlugin<?> getFormatPlugin(){
     return formatPlugin;
@@ -100,11 +101,15 @@ public class EasySubScan extends AbstractSubScan{
       return formatPlugin.getConfig();
     }
   }
-  
+
   @JsonProperty("columns")
   public List<SchemaPath> getColumns(){
     return columns;
   }
 
-   
+  @Override
+  public int getOperatorType() {
+    return formatPlugin.getReaderOperatorType();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index 864ae48..5ca781b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -94,4 +94,9 @@ public class EasyWriter extends AbstractWriter {
     // TODO:
     return new OperatorCost(1,1,1,1);
   }
+
+  @Override
+  public int getOperatorType() {
+    return formatPlugin.getReaderOperatorType();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 0c50898..89694f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.direct;
 
 import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.RecordReader;
 
 public class DirectSubScan extends AbstractSubScan{
@@ -34,4 +35,10 @@ public class DirectSubScan extends AbstractSubScan{
     return reader;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.DIRECT_SUB_SCAN_VALUE;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 04a9768..e410306 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -22,12 +22,14 @@ import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Lists;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
@@ -80,4 +82,16 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
     }
 
   }
+
+  @Override
+  public int getReaderOperatorType() {
+    return CoreOperatorType.JSON_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    throw new UnsupportedOperationException();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index f6cc58e..cd28d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
@@ -71,7 +72,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
 
   @Override
   public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
-    return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? 
+    return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
   }
 
   @Override
@@ -127,6 +128,16 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
         return true;
       return false;
     }
-    
+
+  }
+
+  @Override
+  public int getReaderOperatorType() {
+    return CoreOperatorType.TEXT_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public int getWriterOperatorType() {
+    return CoreOperatorType.TEXT_WRITER_VALUE;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 7f2d0f1..ecd952c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -25,11 +25,13 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteStreams;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -144,4 +146,9 @@ public class HiveSubScan extends AbstractBase implements SubScan {
   public Iterator<PhysicalOperator> iterator() {
     return Iterators.emptyIterator();
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.HIVE_SUB_SCAN_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index dce4d3b..70e1258 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -18,16 +18,17 @@
 package org.apache.drill.exec.store.ischema;
 
 import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class InfoSchemaSubScan extends AbstractSubScan{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaSubScan.class);
-  
+
 
   private final SelectedTable table;
-  
+
   @JsonCreator
   public InfoSchemaSubScan(@JsonProperty("table") SelectedTable table) {
     this.table = table;
@@ -36,6 +37,10 @@ public class InfoSchemaSubScan extends AbstractSubScan{
   public SelectedTable getTable() {
     return table;
   }
-  
-  
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.INFO_SCHEMA_SUB_SCAN_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
index f616bca..869e40c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.base.AbstractStore;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Store;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -51,7 +52,7 @@ public class MockStorePOP extends AbstractStore {
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    
+
   }
 
   @Override
@@ -69,7 +70,10 @@ public class MockStorePOP extends AbstractStore {
     return new MockStorePOP(child);
   }
 
+  @Override
+  public int getOperatorType() {
+    throw new UnsupportedOperationException();
+  }
 
-  
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
index 0753be5..517ad3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -86,7 +87,7 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
   // will want to replace these two methods with an interface above for AbstractSubScan
   @Override
   public boolean isExecutable() {
-    return true;  
+    return true;
   }
 
   @Override
@@ -103,4 +104,9 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
 
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.MOCK_SUB_SCAN_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index dd5c91c..1f66e9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -140,4 +141,9 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return columns;
   }
 
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 51e9219..762942d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -19,12 +19,14 @@ package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.*;
 import com.google.common.base.Preconditions;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import java.io.IOException;
@@ -88,4 +90,10 @@ public class ParquetWriter extends AbstractWriter {
     // TODO:
     return new OperatorCost(1,1,1,1);
   }
+
+  @Override
+  public int getOperatorType() {
+    return CoreOperatorType.PARQUET_WRITER_VALUE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 36b7509..3dbb98e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -99,7 +99,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     this.queryRequest = queryRequest;
     this.context = new QueryContext(connection.getSession(), queryId, dContext);
     this.initiatingClient = connection;
-    this.fragmentManager = new QueryManager(new ForemanManagerListener(), dContext.getController());
+    this.fragmentManager = new QueryManager(bee.getContext().getCache(), new ForemanManagerListener(), dContext.getController());
     this.bee = bee;
 
     this.state = new AtomicState<QueryState>(QueryState.PENDING) {
@@ -168,7 +168,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
    * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
    */
   public void run() {
-    
+
     final String originalThread = Thread.currentThread().getName();
     Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
     // convert a run query request into action

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
new file mode 100644
index 0000000..509000f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.work.foreman;
+
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+public class FragmentData {
+  private final boolean isLocal;
+  private volatile FragmentStatus status;
+  private volatile long lastStatusUpdate = 0;
+  private final DrillbitEndpoint endpoint;
+
+  public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
+    super();
+    this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
+    this.endpoint = endpoint;
+    this.isLocal = isLocal;
+  }
+
+  public void setStatus(FragmentStatus status){
+    this.status = status;
+    lastStatusUpdate = System.currentTimeMillis();
+  }
+
+  public FragmentStatus getStatus() {
+    return status;
+  }
+
+  public boolean isLocal() {
+    return isLocal;
+  }
+
+  public long getLastStatusUpdate() {
+    return lastStatusUpdate;
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 01b0df8..c9c769a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -20,17 +20,13 @@ package org.apache.drill.exec.work.foreman;
 import io.netty.buffer.ByteBuf;
 
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
-import org.apache.drill.exec.physical.impl.ImplCreator;
-import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -51,28 +47,26 @@ import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 
-import com.google.common.collect.Maps;
-
 /**
- * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.  
+ * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.
  */
 public class QueryManager implements FragmentStatusListener{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
-  
-  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+
+  private final QueryStatus status;
   private final Controller controller;
   private ForemanManagerListener foreman;
   private AtomicInteger remainingFragmentCount;
   private WorkEventBus workBus;
   private FragmentExecutor rootRunner;
   private volatile QueryId queryId;
-  
-  public QueryManager(ForemanManagerListener foreman, Controller controller) {
+
+  public QueryManager(DistributedCache cache, ForemanManagerListener foreman, Controller controller) {
     super();
     this.foreman = foreman;
     this.controller = controller;
     this.remainingFragmentCount = new AtomicInteger(0);
-    
+    this.status = new QueryStatus(cache);
   }
 
   public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
@@ -90,44 +84,44 @@ public class QueryManager implements FragmentStatusListener{
       logger.debug("Setting buffers on root context.");
       rootContext.setBuffers(buffers);
       // add fragment to local node.
-      map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+      status.add(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
       logger.debug("Fragment added to local node.");
       rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment));
       RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
-      
+
       if(buffers.isDone()){
         // if we don't have to wait for any incoming data, start the fragment runner.
         bee.addFragmentRunner(fragmentManager.getRunnable());
       }else{
         // if we do, record the fragment manager in the workBus.
-        workBus.setRootFragmentManager(fragmentManager);  
+        workBus.setRootFragmentManager(fragmentManager);
       }
-      
-      
+
+
     }
 
     // keep track of intermediate fragments (not root or leaf)
     for (PlanFragment f : intermediateFragments) {
       logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
-      map.put(f.getHandle(), new FragmentData(f.getHandle(), f.getAssignment(), false));
+      status.add(f.getHandle(), new FragmentData(f.getHandle(), f.getAssignment(), false));
     }
 
     // send remote (leaf) fragments.
     for (PlanFragment f : leafFragments) {
       sendRemoteFragment(f);
     }
-    
+
     logger.debug("Fragment runs setup is complete.");
   }
-    
+
   private void sendRemoteFragment(PlanFragment fragment){
     logger.debug("Sending remote fragment to node {} with data {}", fragment.getAssignment(), fragment.getFragmentJson());
-    map.put(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
+    status.add(fragment.getHandle(), new FragmentData(fragment.getHandle(), fragment.getAssignment(), false));
     FragmentSubmitListener listener = new FragmentSubmitListener(fragment.getAssignment(), fragment);
     controller.getTunnel(fragment.getAssignment()).sendFragment(listener, fragment);
   }
-  
-  
+
+
   @Override
   public void statusUpdate(FragmentStatus status) {
     logger.debug("New fragment status was provided to Foreman of {}", status);
@@ -151,11 +145,11 @@ public class QueryManager implements FragmentStatusListener{
       throw new UnsupportedOperationException();
     }
   }
-  
+
   private void updateStatus(FragmentStatus status){
-    map.get(status.getHandle()).setStatus(status);
+    this.status.update(status);
   }
-  
+
   private void finished(FragmentStatus status){
     updateStatus(status);
     int remaining = remainingFragmentCount.decrementAndGet();
@@ -167,15 +161,15 @@ public class QueryManager implements FragmentStatusListener{
       foreman.cleanupAndSendResult(result);
     }
   }
-  
+
   private void fail(FragmentStatus status){
     updateStatus(status);
     stopQuery();
     QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
     foreman.cleanupAndSendResult(result);
   }
- 
-  
+
+
   private void stopQuery(){
     // Stop all queries with a currently active status.
 //    for(FragmentData data: map.values()){
@@ -195,13 +189,13 @@ public class QueryManager implements FragmentStatusListener{
 //      }
 //    }
   }
-  
+
   public void cancel(){
     stopQuery();
   }
 
   private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-    
+
     public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
       super(endpoint, handle);
     }
@@ -220,15 +214,13 @@ public class QueryManager implements FragmentStatusListener{
     }
 
   };
-  
+
   public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
     return new FragmentSubmitListener(endpoint, value);
   }
-  
-  
-  
+
   private class FragmentSubmitListener extends EndpointListener<Ack, PlanFragment>{
-    
+
     public FragmentSubmitListener(DrillbitEndpoint endpoint, PlanFragment value) {
       super(endpoint, value);
     }
@@ -240,44 +232,6 @@ public class QueryManager implements FragmentStatusListener{
     }
 
   }
-  
-  
-  private class FragmentData{
-    private final boolean isLocal;
-    private volatile FragmentStatus status;
-    private volatile long lastStatusUpdate = 0;
-    private final DrillbitEndpoint endpoint;
-    
-    public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
-      super();
-      this.status = FragmentStatus.newBuilder().setHandle(handle).setState(FragmentState.SENDING).build();
-      this.endpoint = endpoint;
-      this.isLocal = isLocal;
-    }
-    
-    public void setStatus(FragmentStatus status){
-      this.status = status;
-      lastStatusUpdate = System.currentTimeMillis();
-    }
-
-    public FragmentStatus getStatus() {
-      return status;
-    }
-
-    public boolean isLocal() {
-      return isLocal;
-    }
-
-    public long getLastStatusUpdate() {
-      return lastStatusUpdate;
-    }
-
-    public DrillbitEndpoint getEndpoint() {
-      return endpoint;
-    }
-    
-    
-  }
 
   private class RootStatusHandler extends AbstractStatusReporter{
 
@@ -290,7 +244,7 @@ public class QueryManager implements FragmentStatusListener{
       QueryManager.this.statusUpdate(status);
     }
 
-    
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
new file mode 100644
index 0000000..09858d5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -0,0 +1,33 @@
+package org.apache.drill.exec.work.foreman;
+
+import java.util.Map;
+
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import com.google.common.collect.Maps;
+
+public class QueryStatus {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
+
+  public Map<FragmentHandle, FragmentData> map = Maps.newHashMap(); // doesn't need to be thread safe as map is generated in a single thread and then accessed by multiple threads for reads only.
+
+  private final String queryId;
+
+  public QueryStatus(QueryId id, DistributedCache cache){
+    this.queryId = QueryIdHelper.getQueryId(id);
+    cache.getMultiMap(QueryStatus.class);
+
+  }
+
+  void add(FragmentHandle handle, FragmentData data){
+    if(map.put(handle,  data) != null) throw new IllegalStateException();
+  }
+
+  void update(FragmentStatus status){
+    map.get(status.getHandle()).setStatus(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5472140a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
deleted file mode 100644
index c75808d..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PerformanceTests.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl;
-
-import com.google.caliper.runner.CaliperMain;
-
-
-public class PerformanceTests {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PerformanceTests.class);
-
-
-  public static void main(String[] args){
-    CaliperMain.main(TestExecutionAbstractions.class, args);
-    System.out.println("Hello");
-  }
-}