You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/16 23:31:24 UTC

[15/32] git commit: Fix and improve runtime stats profiles - Stop stats processing while waiting for next. - Fix stats collection in PartitionSender and ScanBatch - Add stats to all senders - Add wait time to operator profile.

Fix and improve runtime stats profiles
- Stop stats processing while waiting for next.
- Fix stats collection in PartitionSender and ScanBatch
- Add stats to all senders
- Add wait time to operator profile.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fc1a7778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fc1a7778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fc1a7778

Branch: refs/heads/master
Commit: fc1a7778e2af3b07117f99070530dd5a296ebc6d
Parents: 49a9ff2
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Jun 13 13:14:12 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jun 16 08:04:43 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/OperatorStats.java    |  20 ++-
 .../drill/exec/physical/impl/BaseRootExec.java  |  36 +++-
 .../physical/impl/RandomReceiverCreator.java    |   2 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  58 +++---
 .../drill/exec/physical/impl/ScreenCreator.java |  39 ++--
 .../exec/physical/impl/SingleSenderCreator.java |  29 ++-
 .../exec/physical/impl/WireRecordBatch.java     |  25 ++-
 .../BroadcastSenderRootExec.java                |  28 ++-
 .../impl/mergereceiver/MergingRecordBatch.java  |  15 +-
 .../PartitionSenderRootExec.java                |  18 +-
 .../partitionsender/PartitionerTemplate.java    |  14 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   2 +
 .../drill/exec/server/rest/ProfileWrapper.java  | 177 ++++++++++++-------
 .../drill/exec/proto/SchemaUserBitShared.java   |   7 +
 .../apache/drill/exec/proto/UserBitShared.java  | 138 ++++++++++++---
 .../drill/exec/proto/beans/OperatorProfile.java |  22 +++
 protocol/src/main/protobuf/UserBitShared.proto  |   1 +
 17 files changed, 466 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 4ac8f74..4afea7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -41,12 +41,15 @@ public class OperatorStats {
 
   private boolean inProcessing = false;
   private boolean inSetup = false;
+  private boolean inWait = false;
 
   protected long processingNanos;
   protected long setupNanos;
+  protected long waitNanos;
 
   private long processingMark;
   private long setupMark;
+  private long waitMark;
 
   private long schemas;
 
@@ -89,6 +92,20 @@ public class OperatorStats {
     inProcessing = false;
   }
 
+  public void startWait() {
+    assert !inWait;
+    stopProcessing();
+    inWait = true;
+    waitMark = System.nanoTime();
+  }
+
+  public void stopWait() {
+    assert inWait;
+    startProcessing();
+    waitNanos += System.nanoTime() - waitMark;
+    inWait = false;
+  }
+
   public void batchReceived(int inputIndex, long records, boolean newSchema) {
     recordsReceivedByInput[inputIndex] += records;
     batchesReceivedByInput[inputIndex]++;
@@ -103,7 +120,8 @@ public class OperatorStats {
         .setOperatorType(operatorType) //
         .setOperatorId(operatorId) //
         .setSetupNanos(setupNanos) //
-        .setProcessNanos(processingNanos);
+        .setProcessNanos(processingNanos)
+        .setWaitNanos(waitNanos);
 
     addAllMetrics(b);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 256c106..452052b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,11 +17,25 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public abstract class BaseRootExec implements RootExec {
 
-  protected OperatorStats stats = null;
+  protected SenderStats stats = null;
+  protected OperatorContext oContext = null;
+
+  public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException {
+    this.stats = new SenderStats(config);
+    context.getStats().addOperatorStats(this.stats);
+    this.oContext = new OperatorContext(config, context, stats);
+  }
 
   @Override
   public final boolean next() {
@@ -35,8 +49,24 @@ public abstract class BaseRootExec implements RootExec {
     }
   }
 
-  public void setStats(OperatorStats stats) {
-    this.stats = stats;
+  public final IterOutcome next(RecordBatch b){
+    stats.stopProcessing();
+    IterOutcome next;
+    try {
+      next = b.next();
+    } finally {
+      stats.startProcessing();
+    }
+
+    switch(next){
+      case OK_NEW_SCHEMA:
+        stats.batchReceived(0, b.getRecordCount(), true);
+        break;
+      case OK:
+        stats.batchReceived(0, b.getRecordCount(), false);
+        break;
+    }
+    return next;
   }
 
   public abstract boolean innerNext();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
index 966c221..4ff5831 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
@@ -39,7 +39,7 @@ public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
     RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
     assert buffers.length == 1;
     RawBatchBuffer buffer = buffers[0];
-    return new WireRecordBatch(context, buffer);
+    return new WireRecordBatch(context, buffer, receiver);
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d142ff8..55d3f62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -127,34 +127,44 @@ public class ScanBatch implements RecordBatch {
 
   @Override
   public IterOutcome next() {
-    mutator.allocate(MAX_RECORD_CNT);
-    while ((recordCount = currentReader.next()) == 0) {
-      try {
-        if (!readers.hasNext()) {
-          currentReader.cleanup();
+    oContext.getStats().startProcessing();
+    try {
+      mutator.allocate(MAX_RECORD_CNT);
+      while ((recordCount = currentReader.next()) == 0) {
+        try {
+          if (!readers.hasNext()) {
+            currentReader.cleanup();
+            releaseAssets();
+            return IterOutcome.NONE;
+          }
+          oContext.getStats().startSetup();
+          try {
+            currentReader.cleanup();
+            currentReader = readers.next();
+            partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+            currentReader.setup(mutator);
+            mutator.allocate(MAX_RECORD_CNT);
+            addPartitionVectors();
+          } finally {
+            oContext.getStats().stopSetup();
+          }
+        } catch (ExecutionSetupException e) {
+          this.context.fail(e);
           releaseAssets();
-          return IterOutcome.NONE;
+          return IterOutcome.STOP;
         }
-        currentReader.cleanup();
-        currentReader = readers.next();
-        partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
-        currentReader.setup(mutator);
-        mutator.allocate(MAX_RECORD_CNT);
-        addPartitionVectors();
-      } catch (ExecutionSetupException e) {
-        this.context.fail(e);
-        releaseAssets();
-        return IterOutcome.STOP;
       }
-    }
 
-    populatePartitionVectors();
-    if (mutator.isNewSchema()) {
-      container.buildSchema(SelectionVectorMode.NONE);
-      schema = container.getSchema();
-      return IterOutcome.OK_NEW_SCHEMA;
-    } else {
-      return IterOutcome.OK;
+      populatePartitionVectors();
+      if (mutator.isNewSchema()) {
+        container.buildSchema(SelectionVectorMode.NONE);
+        schema = container.getSchema();
+        return IterOutcome.OK_NEW_SCHEMA;
+      } else {
+        return IterOutcome.OK;
+      }
+    } finally {
+      oContext.getStats().stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 643552b..86e77d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -21,7 +21,10 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -44,14 +47,14 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
   @Override
-  public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+  public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkNotNull(children);
     Preconditions.checkArgument(children.size() == 1);
-    return new ScreenRoot(context, children.iterator().next());
+    return new ScreenRoot(context, children.iterator().next(), config);
   }
 
 
-  static class ScreenRoot implements RootExec{
+  static class ScreenRoot extends BaseRootExec {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
     volatile boolean ok = true;
 
@@ -62,9 +65,9 @@ public class ScreenCreator implements RootCreator<Screen>{
     final UserClientConnection connection;
     private RecordMaterializer materializer;
 
-    public ScreenRoot(FragmentContext context, RecordBatch incoming){
+    public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+      super(context, config);
       assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
-
       this.context = context;
       this.incoming = incoming;
       this.connection = context.getConnection();
@@ -72,14 +75,14 @@ public class ScreenCreator implements RootCreator<Screen>{
 
 
     @Override
-    public boolean next() {
+    public boolean innerNext() {
       if(!ok){
         stop();
         context.fail(this.listener.ex);
         return false;
       }
 
-      IterOutcome outcome = incoming.next();
+      IterOutcome outcome = next(incoming);
 //      logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
@@ -92,7 +95,12 @@ public class ScreenCreator implements RootCreator<Screen>{
               .setIsLastChunk(true) //
               .build();
           QueryWritableBatch batch = new QueryWritableBatch(header);
-          connection.sendResult(listener, batch);
+          stats.startWait();
+          try {
+            connection.sendResult(listener, batch);
+          } finally {
+            stats.stopWait();
+          }
           sendCount.increment();
 
           return false;
@@ -107,7 +115,12 @@ public class ScreenCreator implements RootCreator<Screen>{
             .setIsLastChunk(true) //
             .build();
         QueryWritableBatch batch = new QueryWritableBatch(header);
-        connection.sendResult(listener, batch);
+        stats.startWait();
+        try {
+          connection.sendResult(listener, batch);
+        } finally {
+          stats.stopWait();
+        }
         sendCount.increment();
 
         return false;
@@ -119,7 +132,12 @@ public class ScreenCreator implements RootCreator<Screen>{
 //        context.getStats().batchesCompleted.inc(1);
 //        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
         QueryWritableBatch batch = materializer.convertNext(false);
-        connection.sendResult(listener, batch);
+        stats.startWait();
+        try {
+          connection.sendResult(listener, batch);
+        } finally {
+          stats.stopWait();
+        }
         sendCount.increment();
 
         return true;
@@ -131,6 +149,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     @Override
     public void stop() {
       sendCount.waitForSendComplete();
+      oContext.close();
       incoming.cleanup();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 7679701..9e91468 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,7 +22,10 @@ import io.netty.buffer.ByteBuf;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -44,7 +47,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
   
   
   
-  private static class SingleSenderRootExec implements RootExec{
+  private static class SingleSenderRootExec extends BaseRootExec {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
     private RecordBatch incoming;
     private DataTunnel tunnel;
@@ -53,8 +56,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private FragmentContext context;
     private volatile boolean ok = true;
     private final SendingAccountor sendCount = new SendingAccountor();
-    
-    public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
+
+    public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+      super(context, config);
       this.incoming = batch;
       assert(incoming != null);
       this.handle = context.getHandle();
@@ -65,27 +69,37 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     }
     
     @Override
-    public boolean next() {
+    public boolean innerNext() {
       if(!ok){
         incoming.kill();
         
         return false;
       }
-      IterOutcome out = incoming.next();
+      IterOutcome out = next(incoming);
 //      logger.debug("Outcome of sender next {}", out);
       switch(out){
       case STOP:
       case NONE:
         FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
         sendCount.increment();
-        tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+        stats.startWait();
+        try {
+          tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+        } finally {
+          stats.stopWait();
+        }
         return false;
 
       case OK_NEW_SCHEMA:
       case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         sendCount.increment();
-        tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+        stats.startWait();
+        try {
+          tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+        } finally {
+          stats.stopWait();
+        }
         return true;
 
       case NOT_YET:
@@ -98,6 +112,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     public void stop() {
       ok = false;
       sendCount.waitForSendComplete();
+      oContext.close();
       incoming.cleanup();
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index c7fc813..bc2cdb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -24,6 +24,9 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.RandomReceiver;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -43,14 +46,16 @@ public class WireRecordBatch implements RecordBatch {
   private RawFragmentBatchProvider fragProvider;
   private FragmentContext context;
   private BatchSchema schema;
+  private OperatorStats stats;
 
 
-  public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException {
+  public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException {
     this.fragProvider = fragProvider;
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
     // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
     this.batchLoader = new RecordBatchLoader(context.getAllocator());
+    this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0));
   }
 
   @Override
@@ -100,14 +105,22 @@ public class WireRecordBatch implements RecordBatch {
 
   @Override
   public IterOutcome next() {
+    stats.startProcessing();
     try{
-      RawFragmentBatch batch = fragProvider.getNext();
-
-      // skip over empty batches. we do this since these are basically control messages.
-      while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+      RawFragmentBatch batch;
+      try {
+        stats.startWait();
         batch = fragProvider.getNext();
+
+        // skip over empty batches. we do this since these are basically control messages.
+        while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+          batch = fragProvider.getNext();
+        }
+      } finally {
+        stats.stopWait();
       }
 
+
       if (batch == null){
         batchLoader.clear();
         return IterOutcome.NONE;
@@ -133,6 +146,8 @@ public class WireRecordBatch implements RecordBatch {
     }catch(SchemaChangeException | IOException ex){
       context.fail(ex);
       return IterOutcome.STOP;
+    } finally {
+      stats.stopProcessing();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 9c55825..a70cd50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,12 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
 import org.apache.drill.exec.physical.config.BroadcastSender;
+import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -43,7 +47,7 @@ import org.apache.drill.exec.work.ErrorHelper;
  * This is useful in cases such as broadcast join where sending the entire table to join
  * to all nodes is cheaper than merging and computing all the joins in the same node.
  */
-public class BroadcastSenderRootExec implements RootExec {
+public class BroadcastSenderRootExec extends BaseRootExec {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
   private final FragmentContext context;
   private final BroadcastSender config;
@@ -54,7 +58,8 @@ public class BroadcastSenderRootExec implements RootExec {
 
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
-                                 BroadcastSender config) {
+                                 BroadcastSender config) throws OutOfMemoryException {
+    super(context, config);
     this.ok = true;
     this.context = context;
     this.incoming = incoming;
@@ -69,20 +74,25 @@ public class BroadcastSenderRootExec implements RootExec {
   }
 
   @Override
-  public boolean next() {
+  public boolean innerNext() {
     if(!ok) {
       context.fail(statusHandler.ex);
       return false;
     }
 
-    RecordBatch.IterOutcome out = incoming.next();
+    RecordBatch.IterOutcome out = next(incoming);
     logger.debug("Outcome of sender next {}", out);
     switch(out){
       case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
-          tunnels[i].sendRecordBatch(this.statusHandler, b2);
+          stats.startWait();
+          try {
+            tunnels[i].sendRecordBatch(this.statusHandler, b2);
+          } finally {
+            stats.stopWait();
+          }
           statusHandler.sendCount.increment();
         }
 
@@ -96,7 +106,12 @@ public class BroadcastSenderRootExec implements RootExec {
         }
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
-          tunnels[i].sendRecordBatch(this.statusHandler, batch);   
+          stats.startWait();
+          try {
+            tunnels[i].sendRecordBatch(this.statusHandler, batch);
+          } finally {
+            stats.stopWait();
+          }
           statusHandler.sendCount.increment();
         }
 
@@ -135,6 +150,7 @@ public class BroadcastSenderRootExec implements RootExec {
   public void stop() {
       ok = false;
       statusHandler.sendCount.waitForSendComplete();
+      oContext.close();
       incoming.cleanup();
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 25ee667..a5d80b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -119,13 +119,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
-    long startNext = System.nanoTime();
-    RawFragmentBatch b = provider.getNext();
-    if(b != null){
-      stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+    stats.startWait();
+    try {
+      RawFragmentBatch b = provider.getNext();
+      if(b != null){
+        stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+      }
+      return b;
+    } finally {
+      stats.stopWait();
     }
-    stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext);
-    return b;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bb640b4..7535dcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private HashPartitionSender operator;
   private Partitioner partitioner;
   private FragmentContext context;
-  private OperatorContext oContext;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
-  private final SenderStats stats;
 
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws OutOfMemoryException {
-
+    super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
     this.statusHandler = new StatusHandler(sendCount, context);
-    this.stats = new SenderStats(operator);
-    context.getStats().addOperatorStats(this.stats);
-    setStats(stats);
-    this.oContext = new OperatorContext(operator, context, stats);
   }
 
   @Override
@@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
       return false;
     }
 
-    RecordBatch.IterOutcome out = incoming.next();
+    RecordBatch.IterOutcome out = next(incoming);
+
     logger.debug("Partitioner.next(): got next record batch with status {}", out);
     switch(out){
       case NONE:
@@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
             partitioner.flushOutgoingBatches(false, true);
             partitioner.clear();
           }
-          // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code
           createPartitioner();
         } catch (IOException e) {
           incoming.kill();
@@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
               fieldId,
               WritableBatch.getBatchNoHVWrap(0, container, false));
       tunnel.sendRecordBatch(statusHandler, writableBatch);
+      stats.startWait();
+      try {
+        tunnel.sendRecordBatch(statusHandler, writableBatch);
+      } finally {
+        stats.stopWait();
+      }
       this.sendCount.increment();
       fieldId++;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 9bb24d4..6a26d30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -265,7 +265,12 @@ public abstract class PartitionerTemplate implements Partitioner {
                 oppositeMinorFragmentId,
                 getWritableBatch());
 
-        tunnel.sendRecordBatch(statusHandler, writableBatch);
+        stats.startWait();
+        try {
+          tunnel.sendRecordBatch(statusHandler, writableBatch);
+        } finally {
+          stats.stopWait();
+        }
         this.sendCount.increment();
       } else {
         logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
@@ -278,7 +283,12 @@ public abstract class PartitionerTemplate implements Partitioner {
                   operator.getOppositeMajorFragmentId(),
                   oppositeMinorFragmentId,
                   getWritableBatch());
-          tunnel.sendRecordBatch(statusHandler, writableBatch);
+          stats.startWait();
+          try {
+            tunnel.sendRecordBatch(statusHandler, writableBatch);
+          } finally {
+            stats.stopWait();
+          }
           this.sendCount.increment();
           vectorContainer.clear();
           return;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index d71b811..72a7d3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -66,7 +66,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   public final IterOutcome next(int inputIndex, RecordBatch b){
+    stats.stopProcessing();
     IterOutcome next = b.next();
+    stats.startProcessing();
 
     switch(next){
     case OK_NEW_SCHEMA:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
index a1d4df9..2952c41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
@@ -23,15 +23,20 @@ import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
 
+import java.text.DateFormat;
 import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Locale;
 
 public class ProfileWrapper {
 
   NumberFormat format = NumberFormat.getInstance(Locale.US);
+  DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
 
   public QueryProfile profile;
 
@@ -46,10 +51,14 @@ public class ProfileWrapper {
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
-    builder.append("MAJOR FRAGMENTS\nid\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
+    builder.append("MAJOR FRAGMENTS\nid\tfirst start\tlast start\tfirst end\tlast end\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
     builder.append("\n");
     for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
-      builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), new MajorFragmentWrapper(majorProfile).toString()));
+      builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printOperatorsInMajor(majorProfile)));
+    }
+    builder.append("\n");
+    for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
+      builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printMinorFragmentsInMajor(majorProfile)));
     }
     return builder.toString();
   }
@@ -58,8 +67,12 @@ public class ProfileWrapper {
     StringBuilder builder = new StringBuilder();
     for (MajorFragmentProfile m : profile.getFragmentProfileList()) {
       List<Long> totalTimes = Lists.newArrayList();
+      List<Long> startTimes = Lists.newArrayList();
+      List<Long> endTimes = Lists.newArrayList();
       for (MinorFragmentProfile minorFragmentProfile : m.getMinorFragmentProfileList()) {
         totalTimes.add(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime());
+        startTimes.add(minorFragmentProfile.getStartTime());
+        endTimes.add(minorFragmentProfile.getEndTime());
       }
       long min = Collections.min(totalTimes);
       long max = Collections.max(totalTimes);
@@ -67,82 +80,120 @@ public class ProfileWrapper {
       for (Long l : totalTimes) {
         sum += l;
       }
+      long firstStart = Collections.min(startTimes);
+      long lastStart = Collections.max(startTimes);
+      long firstEnd = Collections.min(endTimes);
+      long lastEnd = Collections.max(endTimes);
       long avg = sum / totalTimes.size();
-      builder.append(String.format("%d\t%s\t%s\t%s\n", m.getMajorFragmentId(), format.format(min), format.format(avg), format.format(max)));
+      builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMajorFragmentId(), dateFormat.format(new Date(firstStart)),
+              dateFormat.format(new Date(lastStart)), dateFormat.format(new Date(firstEnd)), dateFormat.format(new Date(lastEnd)),
+              format.format(min), format.format(avg), format.format(max)));
     }
     return builder.toString();
   }
 
-  public class MajorFragmentWrapper {
-    MajorFragmentProfile majorFragmentProfile;
+  public String printMinorFragmentsInMajor(MajorFragmentProfile majorFragmentProfile) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("id\tstart\tend\ttotal time (ms)\tmax records\tbatches\n");
+    for (MinorFragmentProfile m : majorFragmentProfile.getMinorFragmentProfileList()) {
+      long startTime = m.getStartTime();
+      long endTime = m.getEndTime();
+
+      List<OperatorProfile> operators = m.getOperatorProfileList();
+      OperatorProfile biggest = null;
+      int biggestIncomingRecords = 0;
+      for (OperatorProfile oProfile : operators) {
+        if (biggest == null) {
+          biggest = oProfile;
+          int incomingRecordCount = 0;
+          for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+            incomingRecordCount += streamProfile.getRecords();
+          }
+          biggestIncomingRecords = incomingRecordCount;
+        } else {
+          int incomingRecordCount = 0;
+          for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+            incomingRecordCount += streamProfile.getRecords();
+          }
+          if (incomingRecordCount > biggestIncomingRecords) {
+            biggest = oProfile;
+            biggestIncomingRecords = incomingRecordCount;
+          }
+        }
+      }
 
-    public MajorFragmentWrapper(MajorFragmentProfile majorFragmentProfile) {
-      this.majorFragmentProfile = majorFragmentProfile;
-    }
+      int biggestBatches = 0;
+      for (StreamProfile sProfile : biggest.getInputProfileList()) {
+        biggestBatches += sProfile.getBatches();
+      }
 
-    @Override
-    public String toString() {
-      return String.format("Minor Fragments\nid\ttotal time (ms)\n%s\nOperators\nid\ttype\tmin\tavg\tmax\t(time in ns)\n%s\n", new MinorFragmentsInMajor().toString(), new OperatorsInMajor().toString());
+      builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\n", m.getMinorFragmentId(), dateFormat.format(new Date(startTime)),
+              dateFormat.format(new Date(endTime)), format.format(endTime - startTime), biggestIncomingRecords, biggestBatches));
     }
+    return builder.toString();
+  }
 
-    public class MinorFragmentsInMajor {
-
-      @Override
-      public String toString() {
-        StringBuilder builder = new StringBuilder();
-        for (MinorFragmentProfile minorFragmentProfile: majorFragmentProfile.getMinorFragmentProfileList()) {
-          builder.append(String.format("%d\t%s\n", minorFragmentProfile.getMinorFragmentId(), format.format(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime())));
+  public String printOperatorsInMajor(MajorFragmentProfile majorFragmentProfile) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("id\ttype\tp min\tp avg\tp max\ts min\ts avg\ts max\tw min\tw avg\tw max\n");
+    int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
+    int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
+    long[][] processing = new long[numOperators + 1][numFragments];
+    long[][] setup = new long[numOperators + 1][numFragments];
+    long[][] wait = new long[numOperators + 1][numFragments];
+    CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
+
+    for (int i = 0; i < numFragments; i++) {
+      MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
+      for (int j = 0; j < numOperators; j++) {
+        OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
+        int operatorId = operatorProfile.getOperatorId();
+        processing[operatorId][i] = operatorProfile.getProcessNanos();
+        setup[operatorId][i] = operatorProfile.getSetupNanos();
+        wait[operatorId][i] = operatorProfile.getWaitNanos();
+        if (i == 0) {
+          operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
         }
-        return builder.toString();
       }
     }
 
-    public class OperatorsInMajor {
-
-      @Override
-      public String toString() {
-        StringBuilder builder = new StringBuilder();
-        int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
-        int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
-        long[][] values = new long[numOperators + 1][numFragments];
-        CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
-
-        for (int i = 0; i < numFragments; i++) {
-          MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
-          for (int j = 0; j < numOperators; j++) {
-            OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
-            int operatorId = operatorProfile.getOperatorId();
-            values[operatorId][i] = operatorProfile.getProcessNanos() + operatorProfile.getSetupNanos();
-            if (i == 0) {
-              operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
-            }
-          }
-        }
-
-        for (int j = 0; j < numOperators + 1; j++) {
-          if (operatorTypes[j] == null) {
-            continue;
-          }
-          long min = Long.MAX_VALUE;
-          long max = Long.MIN_VALUE;
-          long sum = 0;
-
-          for (int i = 0; i < numFragments; i++) {
-            min = Math.min(min, values[j][i]);
-            max = Math.max(max, values[j][i]);
-            sum += values[j][i];
-          }
+    for (int j = 0; j < numOperators + 1; j++) {
+      if (operatorTypes[j] == null) {
+        continue;
+      }
+      long processingMin = Long.MAX_VALUE;
+      long processingMax = Long.MIN_VALUE;
+      long processingSum = 0;
+      long setupMin = Long.MAX_VALUE;
+      long setupMax = Long.MIN_VALUE;
+      long setupSum = 0;
+      long waitMin = Long.MAX_VALUE;
+      long waitMax = Long.MIN_VALUE;
+      long waitSum = 0;
+
+      for (int i = 0; i < numFragments; i++) {
+        processingMin = Math.min(processingMin, processing[j][i]);
+        processingMax = Math.max(processingMax, processing[j][i]);
+        processingSum += processing[j][i];
+
+        setupMin = Math.min(setupMin, setup[j][i]);
+        setupMax = Math.max(setupMax, setup[j][i]);
+        setupSum += setup[j][i];
+
+        waitMin = Math.min(waitMin, wait[j][i]);
+        waitMax = Math.max(waitMax, wait[j][i]);
+        waitSum += wait[j][i];
+      }
 
-          long avg = sum / numFragments;
+      long processingAvg = processingSum / numFragments;
+      long setupAvg = setupSum / numFragments;
+      long waitAvg = waitSum / numFragments;
 
-          builder.append(String.format("%d\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(), format.format(min), format.format(avg), format.format(max)));
-        }
-        return builder.toString();
-      }
+      builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(),
+              format.format(processingMin/1000/1000), format.format(processingAvg/1000/1000), format.format(processingMax/1000/1000),
+              format.format(setupMin/1000/1000), format.format(setupAvg/1000/1000), format.format(setupMax/1000/1000),
+              format.format(waitMin/1000/1000), format.format(waitAvg/1000/1000), format.format(waitMax/1000/1000)));
     }
+    return builder.toString();
   }
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index b4718bb..23fcf21 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -1755,6 +1755,8 @@ public final class SchemaUserBitShared
                 for(org.apache.drill.exec.proto.UserBitShared.MetricValue metric : message.getMetricList())
                     output.writeObject(8, metric, org.apache.drill.exec.proto.SchemaUserBitShared.MetricValue.WRITE, true);
 
+                if(message.hasWaitNanos())
+                    output.writeInt64(9, message.getWaitNanos(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.OperatorProfile message)
             {
@@ -1817,6 +1819,9 @@ public final class SchemaUserBitShared
                             builder.addMetric(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.MetricValue.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.MetricValue.MERGE));
 
                             break;
+                        case 9:
+                            builder.setWaitNanos(input.readInt64());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -1864,6 +1869,7 @@ public final class SchemaUserBitShared
                 case 6: return "processNanos";
                 case 7: return "localMemoryAllocated";
                 case 8: return "metric";
+                case 9: return "waitNanos";
                 default: return null;
             }
         }
@@ -1882,6 +1888,7 @@ public final class SchemaUserBitShared
             fieldMap.put("processNanos", 6);
             fieldMap.put("localMemoryAllocated", 7);
             fieldMap.put("metric", 8);
+            fieldMap.put("waitNanos", 9);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index c100968..faeba6f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -13836,6 +13836,16 @@ public final class UserBitShared {
      */
     org.apache.drill.exec.proto.UserBitShared.MetricValueOrBuilder getMetricOrBuilder(
         int index);
+
+    // optional int64 wait_nanos = 9;
+    /**
+     * <code>optional int64 wait_nanos = 9;</code>
+     */
+    boolean hasWaitNanos();
+    /**
+     * <code>optional int64 wait_nanos = 9;</code>
+     */
+    long getWaitNanos();
   }
   /**
    * Protobuf type {@code exec.shared.OperatorProfile}
@@ -13929,6 +13939,11 @@ public final class UserBitShared {
               metric_.add(input.readMessage(org.apache.drill.exec.proto.UserBitShared.MetricValue.PARSER, extensionRegistry));
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000020;
+              waitNanos_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -14127,6 +14142,22 @@ public final class UserBitShared {
       return metric_.get(index);
     }
 
+    // optional int64 wait_nanos = 9;
+    public static final int WAIT_NANOS_FIELD_NUMBER = 9;
+    private long waitNanos_;
+    /**
+     * <code>optional int64 wait_nanos = 9;</code>
+     */
+    public boolean hasWaitNanos() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional int64 wait_nanos = 9;</code>
+     */
+    public long getWaitNanos() {
+      return waitNanos_;
+    }
+
     private void initFields() {
       inputProfile_ = java.util.Collections.emptyList();
       operatorId_ = 0;
@@ -14135,6 +14166,7 @@ public final class UserBitShared {
       processNanos_ = 0L;
       localMemoryAllocated_ = 0L;
       metric_ = java.util.Collections.emptyList();
+      waitNanos_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -14169,6 +14201,9 @@ public final class UserBitShared {
       for (int i = 0; i < metric_.size(); i++) {
         output.writeMessage(8, metric_.get(i));
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(9, waitNanos_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -14206,6 +14241,10 @@ public final class UserBitShared {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(8, metric_.get(i));
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(9, waitNanos_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -14346,6 +14385,8 @@ public final class UserBitShared {
         } else {
           metricBuilder_.clear();
         }
+        waitNanos_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -14412,6 +14453,10 @@ public final class UserBitShared {
         } else {
           result.metric_ = metricBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.waitNanos_ = waitNanos_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -14495,6 +14540,9 @@ public final class UserBitShared {
             }
           }
         }
+        if (other.hasWaitNanos()) {
+          setWaitNanos(other.getWaitNanos());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -15167,6 +15215,39 @@ public final class UserBitShared {
         return metricBuilder_;
       }
 
+      // optional int64 wait_nanos = 9;
+      private long waitNanos_ ;
+      /**
+       * <code>optional int64 wait_nanos = 9;</code>
+       */
+      public boolean hasWaitNanos() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional int64 wait_nanos = 9;</code>
+       */
+      public long getWaitNanos() {
+        return waitNanos_;
+      }
+      /**
+       * <code>optional int64 wait_nanos = 9;</code>
+       */
+      public Builder setWaitNanos(long value) {
+        bitField0_ |= 0x00000080;
+        waitNanos_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 wait_nanos = 9;</code>
+       */
+      public Builder clearWaitNanos() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        waitNanos_ = 0L;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.shared.OperatorProfile)
     }
 
@@ -16434,38 +16515,39 @@ public final class UserBitShared {
       "le\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003" +
       "\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_used" +
       "\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbit" +
-      "Endpoint\"\346\001\n\017OperatorProfile\0221\n\rinput_pr" +
+      "Endpoint\"\372\001\n\017OperatorProfile\0221\n\rinput_pr" +
       "ofile\030\001 \003(\0132\032.exec.shared.StreamProfile\022" +
       "\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 " +
       "\001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nano",
       "s\030\006 \001(\003\022\036\n\026local_memory_allocated\030\007 \001(\003\022" +
       "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" +
-      "e\"B\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007b" +
-      "atches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricV" +
-      "alue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 " +
-      "\001(\003\022\024\n\014double_value\030\003 \001(\001*5\n\nRpcChannel\022" +
-      "\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002" +
-      "*/\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010P" +
-      "HYSICAL\020\003*k\n\rFragmentState\022\013\n\007SENDING\020\000\022" +
-      "\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n",
-      "\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005*\362" +
-      "\004\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024" +
-      "\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH" +
-      "_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOI" +
-      "N\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020" +
-      "\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTI" +
-      "TION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\023\n\017RANDOM_REC" +
-      "EIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n" +
-      "\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING" +
-      "_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL",
-      "_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SOR" +
-      "T\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_" +
-      "SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOC" +
-      "K_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRE" +
-      "CT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_S" +
-      "UB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCH" +
-      "EMA_SUB_SCAN\020\036\022\013\n\007FLATTEN\020\037B.\n\033org.apach" +
-      "e.drill.exec.protoB\rUserBitSharedH\001"
+      "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" +
+      "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" +
+      "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " +
+      "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030" +
+      "\003 \001(\001*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010" +
+      "BIT_DATA\020\001\022\010\n\004USER\020\002*/\n\tQueryType\022\007\n\003SQL" +
+      "\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003*k\n\rFragmen" +
+      "tState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATI",
+      "ON\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCE" +
+      "LLED\020\004\022\n\n\006FAILED\020\005*\362\004\n\020CoreOperatorType\022" +
+      "\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001" +
+      "\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH" +
+      "_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITIO" +
+      "N_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVE" +
+      "R\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t\022\013\n\007PRO" +
+      "JECT\020\n\022\023\n\017RANDOM_RECEIVER\020\013\022\020\n\014RANGE_SEN" +
+      "DER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_RE" +
+      "MOVER\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_",
+      "N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t" +
+      "\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_G" +
+      "ROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM" +
+      "_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQ" +
+      "UET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEX" +
+      "T_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SU" +
+      "B_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\013\n\007FL" +
+      "ATTEN\020\037B.\n\033org.apache.drill.exec.protoB\r" +
+      "UserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -16549,7 +16631,7 @@ public final class UserBitShared {
           internal_static_exec_shared_OperatorProfile_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_shared_OperatorProfile_descriptor,
-              new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "LocalMemoryAllocated", "Metric", });
+              new java.lang.String[] { "InputProfile", "OperatorId", "OperatorType", "SetupNanos", "ProcessNanos", "LocalMemoryAllocated", "Metric", "WaitNanos", });
           internal_static_exec_shared_StreamProfile_descriptor =
             getDescriptor().getMessageTypes().get(13);
           internal_static_exec_shared_StreamProfile_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
index b9ed2e4..f1b1acc 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/OperatorProfile.java
@@ -56,6 +56,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
     private long processNanos;
     private long localMemoryAllocated;
     private List<MetricValue> metric;
+    private long waitNanos;
 
     public OperatorProfile()
     {
@@ -155,6 +156,19 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
         return this;
     }
 
+    // waitNanos
+
+    public long getWaitNanos()
+    {
+        return waitNanos;
+    }
+
+    public OperatorProfile setWaitNanos(long waitNanos)
+    {
+        this.waitNanos = waitNanos;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -236,6 +250,9 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
                     message.metric.add(input.mergeObject(null, MetricValue.getSchema()));
                     break;
 
+                case 9:
+                    message.waitNanos = input.readInt64();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -279,6 +296,9 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
             }
         }
 
+
+        if(message.waitNanos != 0)
+            output.writeInt64(9, message.waitNanos, false);
     }
 
     public String getFieldName(int number)
@@ -292,6 +312,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
             case 6: return "processNanos";
             case 7: return "localMemoryAllocated";
             case 8: return "metric";
+            case 9: return "waitNanos";
             default: return null;
         }
     }
@@ -312,6 +333,7 @@ public final class OperatorProfile implements Externalizable, Message<OperatorPr
         __fieldMap.put("processNanos", 6);
         __fieldMap.put("localMemoryAllocated", 7);
         __fieldMap.put("metric", 8);
+        __fieldMap.put("waitNanos", 9);
     }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc1a7778/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index b754ee5..eb56efb 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -142,6 +142,7 @@ message OperatorProfile {
   optional int64 process_nanos = 6;
   optional int64 local_memory_allocated = 7;  
   repeated MetricValue metric = 8;
+  optional int64 wait_nanos = 9;
 }
 
 message StreamProfile {