You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/30 18:27:49 UTC

[2/5] git commit: DRILL-1077: OperationStats enhancements —

DRILL-1077: OperationStats enhancements —

Add setLongStat and setDoubleStat to set metrics directly.
Remove shared HashTableMetrics class
Add ResizeTime for hash aggregate and hash join.
Not all RootExec should report the metrics in SenderStats. Break up into RootExec specific MetricDefs.
Add BYTES_SENT metric for all RootExec
Rename N_SENDERS to N_RECEIVERS


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/de0bd7d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/de0bd7d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/de0bd7d2

Branch: refs/heads/master
Commit: de0bd7d27f3545f82541f8d98719042718923c14
Parents: df8d4e6
Author: Cliff Buchanan <cb...@maprtech.com>
Authored: Wed Jun 25 14:31:30 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 26 17:35:12 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/OperatorStats.java    |  8 +++
 .../org/apache/drill/exec/ops/SenderStats.java  | 74 --------------------
 .../drill/exec/physical/impl/BaseRootExec.java  |  8 ++-
 .../drill/exec/physical/impl/ScreenCreator.java | 15 ++++
 .../exec/physical/impl/SingleSenderCreator.java | 17 ++++-
 .../impl/aggregate/HashAggTemplate.java         | 25 +++++--
 .../BroadcastSenderRootExec.java                | 18 ++++-
 .../physical/impl/common/HashTableMetrics.java  | 33 ---------
 .../physical/impl/common/HashTableStats.java    |  1 +
 .../physical/impl/common/HashTableTemplate.java |  7 +-
 .../exec/physical/impl/join/HashJoinBatch.java  | 24 +++++--
 .../impl/materialize/QueryWritableBatch.java    |  8 +++
 .../PartitionSenderRootExec.java                | 41 ++++++++++-
 .../partitionsender/PartitionSenderStats.java   | 35 ---------
 .../partitionsender/PartitionerTemplate.java    |  6 ++
 .../exec/record/FragmentWritableBatch.java      |  8 +++
 16 files changed, 170 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index bd8d899..285c6c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -174,5 +174,13 @@ public class OperatorStats {
   public void addDoubleStat(MetricDef metric, double value){
     doubleMetrics.putOrAdd(metric.metricId(), value, value);
   }
+  
+  public void setLongStat(MetricDef metric, long value){
+    longMetrics.put(metric.metricId(), value);
+  }
+
+  public void setDoubleStat(MetricDef metric, double value){
+    doubleMetrics.put(metric.metricId(), value);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
deleted file mode 100644
index 5167edb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.ops;
-
-import java.util.List;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
-import org.apache.drill.exec.proto.UserBitShared;
-
-public class SenderStats extends OperatorStats {
-
-  long minReceiverRecordCount = 0;
-  long maxReceiverRecordCount = 0;
-  int nSenders = 0;
-
-  public SenderStats(PhysicalOperator operator, BufferAllocator allocator) {
-    super(new OpProfileDef(operator.getOperatorId(), operator.getOperatorType(), OperatorContext.getChildCount(operator)), allocator);
-  }
-
-  public void updatePartitionStats(List<? extends PartitionStatsBatch> outgoing) {
-
-    for (PartitionStatsBatch o : outgoing) {
-      long totalRecords = o.getTotalRecords();
-
-      minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
-      maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
-    }
-    nSenders = outgoing.size();
-  }
-
-  @Override
-  public UserBitShared.OperatorProfile getProfile() {
-    final UserBitShared.OperatorProfile.Builder b = UserBitShared.OperatorProfile //
-        .newBuilder() //
-        .setOperatorType(operatorType) //
-        .setOperatorId(operatorId) //
-        .setSetupNanos(setupNanos) //
-        .setProcessNanos(processingNanos);
-
-    addAllMetrics(b);
-
-    return b.build();
-
-  }
-
-  public void addAllMetrics(UserBitShared.OperatorProfile.Builder b) {
-    super.addAllMetrics(b);
-
-    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(minReceiverRecordCount).
-        setMetricId(PartitionSenderStats.MIN_RECORDS.metricId()));
-    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(maxReceiverRecordCount).
-        setMetricId(PartitionSenderStats.MAX_RECORDS.metricId()));
-    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(nSenders)
-        .setMetricId(PartitionSenderStats.N_SENDERS.metricId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index d56da51..fa6c997 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -19,21 +19,23 @@ package org.apache.drill.exec.physical.impl;
 
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.ops.SenderStats;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public abstract class BaseRootExec implements RootExec {
 
-  protected SenderStats stats = null;
+  protected OperatorStats stats = null;
   protected OperatorContext oContext = null;
 
   public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = new OperatorContext(config, context, stats);
-    this.stats = new SenderStats(config, oContext.getAllocator());
+    stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
+        config.getOperatorType(), OperatorContext.getChildCount(config)),
+        oContext.getAllocator());
     context.getStats().addOperatorStats(this.stats);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 9ad85af..146d72d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
@@ -66,6 +67,15 @@ public class ScreenCreator implements RootCreator<Screen>{
     private RecordMaterializer materializer;
     private boolean first = true;
 
+    public enum Metric implements MetricDef {
+      BYTES_SENT;
+      
+      @Override
+      public int metricId() {
+        return ordinal();
+      }
+    }
+    
     public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
       super(context, config);
       assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
@@ -138,6 +148,7 @@ public class ScreenCreator implements RootCreator<Screen>{
 //        context.getStats().batchesCompleted.inc(1);
 //        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
         QueryWritableBatch batch = materializer.convertNext(false);
+        updateStats(batch);
         stats.startWait();
         try {
           connection.sendResult(listener, batch);
@@ -152,6 +163,10 @@ public class ScreenCreator implements RootCreator<Screen>{
         throw new UnsupportedOperationException();
       }
     }
+    
+    public void updateStats(QueryWritableBatch queryBatch) {
+      stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
+    }
 
     @Override
     public void stop() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 1b63112..396f7a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -24,8 +24,9 @@ import java.util.List;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.SingleSender;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -57,6 +58,15 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     private volatile boolean ok = true;
     private final SendingAccountor sendCount = new SendingAccountor();
 
+    public enum Metric implements MetricDef {
+      BYTES_SENT;
+      
+      @Override
+      public int metricId() {
+        return ordinal();
+      }
+    }
+    
     public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
       super(context, config);
       this.incoming = batch;
@@ -95,6 +105,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(),
                 handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        updateStats(batch);
         sendCount.increment();
         stats.startWait();
         try {
@@ -109,6 +120,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
         throw new IllegalStateException();
       }
     }
+    
+    public void updateStats(FragmentWritableBatch writableBatch) {
+      stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); 
+    }
 
     @Override
     public void stop() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1afa5ae..eaf2811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -36,12 +36,12 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.physical.impl.common.HashTableMetrics;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
@@ -99,6 +99,22 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private OperatorStats stats = null;
   private HashTableStats htStats = new HashTableStats();
+  
+  public enum Metric implements MetricDef {
+
+    NUM_BUCKETS,
+    NUM_ENTRIES,
+    NUM_RESIZING,
+    RESIZING_TIME;
+    
+    // duplicate for hash ag
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
+
 
   public class BatchHolder {
 
@@ -550,9 +566,10 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   private void updateStats(HashTable htable) {
     htable.getStats(htStats);
-    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets);
-    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries);
-    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing);
+    this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+    this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+    this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+    this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
   }
 
   // Code-generated methods (implemented in HashAggBatch)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index a70cd50..bdaf8ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -23,8 +23,9 @@ import java.util.List;
 
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.BroadcastSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -55,6 +56,15 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   private final ExecProtos.FragmentHandle handle;
   private volatile boolean ok;
   private final RecordBatch incoming;
+  
+  public enum Metric implements MetricDef {
+    N_RECEIVERS,
+    BYTES_SENT;
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
 
   public BroadcastSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
@@ -106,6 +116,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
         }
         for (int i = 0; i < tunnels.length; ++i) {
           FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
+          updateStats(batch);
           stats.startWait();
           try {
             tunnels[i].sendRecordBatch(this.statusHandler, batch);
@@ -122,6 +133,11 @@ public class BroadcastSenderRootExec extends BaseRootExec {
         throw new IllegalStateException();
     }
   }
+  
+  public void updateStats(FragmentWritableBatch writableBatch) {
+    stats.setLongStat(Metric.N_RECEIVERS, tunnels.length);
+    stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); 
+  }
 
   /*
   private boolean waitAllFutures(boolean haltOnError) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
deleted file mode 100644
index ee84855..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableMetrics.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.common;
-
-import org.apache.drill.exec.ops.MetricDef;
-
-public enum HashTableMetrics implements MetricDef {
-
-  HTABLE_NUM_BUCKETS,
-  HTABLE_NUM_ENTRIES,
-  HTABLE_NUM_RESIZING;
-
-  @Override
-  public int metricId() {
-    return ordinal();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
index 848d860..c494c85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
@@ -22,6 +22,7 @@ public class HashTableStats {
   public int numBuckets;
   public int numEntries;
   public int numResizing;
+  public int resizingTime;
 
   public HashTableStats() {
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 099d2ee..0849e6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -92,6 +92,8 @@ public abstract class HashTableTemplate implements HashTable {
 
   private int numResizing = 0;
   
+  private int resizingTime = 0;
+  
   // This class encapsulates the links, keys and values for up to BATCH_SIZE
   // *unique* records. Thus, suppose there are N incoming record batches, each
   // of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -408,6 +410,7 @@ public abstract class HashTableTemplate implements HashTable {
     stats.numBuckets = numBuckets();
     stats.numEntries = numEntries;
     stats.numResizing = numResizing;
+    stats.resizingTime = resizingTime;
   }
 
   public boolean isEmpty() {
@@ -599,6 +602,8 @@ public abstract class HashTableTemplate implements HashTable {
   private void resizeAndRehashIfNeeded() {
     if (numEntries < threshold)
       return;
+    
+    long t0 = System.currentTimeMillis();
 
     if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
 
@@ -640,7 +645,7 @@ public abstract class HashTableTemplate implements HashTable {
         bh.dump(idx);
       }
     }
-
+    resizingTime += System.currentTimeMillis() - t0;
     numResizing++;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index bd0e23f..2e33d50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -38,11 +38,11 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.physical.impl.common.HashTableMetrics;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
@@ -137,6 +137,21 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
     private final HashTableStats htStats = new HashTableStats();
 
+    public enum Metric implements MetricDef {
+
+      NUM_BUCKETS,
+      NUM_ENTRIES,
+      NUM_RESIZING,
+      RESIZING_TIME;
+      
+      // duplicate for hash ag
+
+      @Override
+      public int metricId() {
+        return ordinal();
+      }
+    }
+    
     @Override
     public int getRecordCount() {
         return outputRecords;
@@ -443,9 +458,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     private void updateStats(HashTable htable) {
       if(htable == null) return;
       htable.getStats(htStats);
-      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets);
-      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries);
-      this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing);
+      this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
+      this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
+      this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
+      this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index c219cce..9d4a629 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -48,6 +48,14 @@ public class QueryWritableBatch {
     return buffers;
   }
 
+  public long getByteCount() {
+    long n = 0;
+    for (ByteBuf buf : buffers) {
+      n += buf.readableBytes();
+    }
+    return n;
+  }
+  
   public QueryResult getHeader() {
     return header;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5fcfdcc..1820cd5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -29,13 +30,17 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.MetricValue;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -61,6 +66,23 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
+  
+  long minReceiverRecordCount = Long.MAX_VALUE;
+  long maxReceiverRecordCount = Long.MIN_VALUE;
+  
+  public enum Metric implements MetricDef {
+    BATCHES_SENT,
+    RECORDS_SENT,
+    MIN_RECORDS,
+    MAX_RECORDS,
+    N_RECEIVERS,
+    BYTES_SENT;
+
+    @Override
+    public int metricId() {
+      return ordinal();
+    }
+  }
 
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
@@ -138,7 +160,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           context.fail(e);
           return false;
         }
-        stats.updatePartitionStats(partitioner.getOutgoingBatches());
+        updateStats(partitioner.getOutgoingBatches());
         for (VectorWrapper<?> v : incoming) {
           v.clear();
         }
@@ -187,6 +209,21 @@ public class PartitionSenderRootExec extends BaseRootExec {
     }
   }
 
+  public void updateStats(List<? extends PartitionStatsBatch> outgoing) {
+    long records = 0;
+    for (PartitionStatsBatch o : outgoing) {
+      long totalRecords = o.getTotalRecords();
+      minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
+      maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
+      records += totalRecords;
+    }
+    stats.addLongStat(Metric.BATCHES_SENT, 1);
+    stats.addLongStat(Metric.RECORDS_SENT, records);
+    stats.setLongStat(Metric.MIN_RECORDS, minReceiverRecordCount);
+    stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
+    stats.setLongStat(Metric.N_RECEIVERS, outgoing.size());
+  }
+  
   public void stop() {
     logger.debug("Partition sender stopping.");
     ok = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
deleted file mode 100644
index de5967f..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import org.apache.drill.exec.ops.MetricDef;
-
-public enum PartitionSenderStats implements MetricDef {
-
-  BATCHES_SENT,
-  RECORDS_SENT,
-  MIN_RECORDS,
-  MAX_RECORDS,
-  N_SENDERS;
-
-  @Override
-  public int metricId() {
-    return ordinal();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 0d967b5..8b63c5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -264,6 +265,7 @@ public abstract class PartitionerTemplate implements Partitioner {
                 oppositeMinorFragmentId,
                 getWritableBatch());
 
+        updateStats(writableBatch);
         stats.startWait();
         try {
           tunnel.sendRecordBatch(statusHandler, writableBatch);
@@ -306,6 +308,10 @@ public abstract class PartitionerTemplate implements Partitioner {
         throw new IOException(statusHandler.getException());
       }
     }
+    
+    public void updateStats(FragmentWritableBatch writableBatch) {
+      stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
+    }
 
     public void initializeBatch() {
       isLast = false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de0bd7d2/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index ef7b5f2..e85eab1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -78,6 +78,14 @@ public class FragmentWritableBatch{
   public ByteBuf[] getBuffers(){
     return buffers;
   }
+  
+  public long getByteCount() {
+    long n = 0;
+    for (ByteBuf buf : buffers) {
+      n += buf.readableBytes();
+    }
+    return n;
+  }
 
   public FragmentRecordBatch getHeader() {
     return header;