You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:52:16 UTC

[31/61] [abbrv] git commit: Add BaseRootExec which will enable collection of stats for Senders Add SenderStats that collects stats specific to senders.

Add BaseRootExec which will enable collection of stats for Senders
Add SenderStats that collects stats specific to senders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9b22d2c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9b22d2c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9b22d2c3

Branch: refs/heads/master
Commit: 9b22d2c37d638451149270a19fe1b2e63d8ea670
Parents: 3e98ffc
Author: Mehant Baid <me...@gmail.com>
Authored: Fri May 23 10:29:48 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jun 8 19:13:06 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentStats.java    |  4 ++
 .../org/apache/drill/exec/ops/OpProfileDef.java |  5 ++
 .../apache/drill/exec/ops/OperatorContext.java  | 13 ++--
 .../apache/drill/exec/ops/OperatorStats.java    | 32 ++++++---
 .../org/apache/drill/exec/ops/SenderStats.java  | 73 ++++++++++++++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  | 21 +++---
 .../PartitionSenderRootExec.java                | 12 +++-
 .../partitionsender/PartitionSenderStats.java   |  5 +-
 .../partitionsender/PartitionStatsBatch.java    | 24 +++++++
 .../impl/partitionsender/Partitioner.java       |  3 +
 .../partitionsender/PartitionerTemplate.java    | 15 +++-
 11 files changed, 175 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index d667794..19ac0aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -54,4 +54,8 @@ public class FragmentStats {
     return stats;
   }
 
+  public void addOperatorStats(OperatorStats stats) {
+    operators.add(stats);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
index fb68e4a..b5c8d86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java
@@ -23,6 +23,11 @@ public class OpProfileDef {
   public int operatorType;
   public int incomingCount;
 
+  public OpProfileDef(int operatorId, int operatorType, int incomingCount) {
+    this.operatorId = operatorId;
+    this.operatorType = operatorType;
+    this.incomingCount = incomingCount;
+  }
   public int getOperatorId(){
     return operatorId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 116b616..d62ea2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -36,14 +36,17 @@ public class OperatorContext implements Closeable {
     this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
     this.popConfig = popConfig;
 
-    OpProfileDef def = new OpProfileDef();
-    def.operatorId = popConfig.getOperatorId();
-    def.incomingCount = getChildCount(popConfig);
-    def.operatorType = popConfig.getOperatorType();
+    OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
     this.stats = context.getStats().getOperatorStats(def);
   }
 
-  private static int getChildCount(PhysicalOperator popConfig){
+  public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException {
+    this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
+    this.popConfig = popConfig;
+    this.stats     = stats;
+  }
+
+  public static int getChildCount(PhysicalOperator popConfig){
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;
     while(iter.hasNext()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index cde1876..4ac8f74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -28,8 +28,8 @@ import com.carrotsearch.hppc.IntLongOpenHashMap;
 public class OperatorStats {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class);
 
-  private final int operatorId;
-  private final int operatorType;
+  protected final int operatorId;
+  protected final int operatorType;
 
   private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap();
   private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap();
@@ -42,8 +42,8 @@ public class OperatorStats {
   private boolean inProcessing = false;
   private boolean inSetup = false;
 
-  private long processingNanos;
-  private long setupNanos;
+  protected long processingNanos;
+  protected long setupNanos;
 
   private long processingMark;
   private long setupMark;
@@ -105,23 +105,37 @@ public class OperatorStats {
         .setSetupNanos(setupNanos) //
         .setProcessNanos(processingNanos);
 
+    addAllMetrics(b);
+
+    return b.build();
+  }
+
+  public void addAllMetrics(OperatorProfile.Builder builder) {
+    addStreamProfile(builder);
+    addLongMetrics(builder);
+    addDoubleMetrics(builder);
+  }
+
+  public void addStreamProfile(OperatorProfile.Builder builder) {
     for(int i = 0; i < recordsReceivedByInput.length; i++){
-      b.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
+      builder.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
     }
+  }
 
+  public void addLongMetrics(OperatorProfile.Builder builder) {
     for(int i =0; i < longMetrics.allocated.length; i++){
       if(longMetrics.allocated[i]){
-        b.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]));
+        builder.addMetric(MetricValue.newBuilder().setMetricId(longMetrics.keys[i]).setLongValue(longMetrics.values[i]));
       }
     }
+  }
 
+  public void addDoubleMetrics(OperatorProfile.Builder builder) {
     for(int i =0; i < doubleMetrics.allocated.length; i++){
       if(doubleMetrics.allocated[i]){
-        b.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]));
+        builder.addMetric(MetricValue.newBuilder().setMetricId(doubleMetrics.keys[i]).setDoubleValue(doubleMetrics.values[i]));
       }
     }
-
-    return b.build();
   }
 
   public void addLongStat(MetricDef metric, long value){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
new file mode 100644
index 0000000..c766632
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats;
+import org.apache.drill.exec.proto.UserBitShared;
+
+import java.util.List;
+
+public class SenderStats extends OperatorStats {
+
+  long minReceiverRecordCount = 0;
+  long maxReceiverRecordCount = 0;
+  int nSenders = 0;
+
+  public SenderStats(PhysicalOperator operator) {
+    super(new OpProfileDef(operator.getOperatorId(), operator.getOperatorType(), OperatorContext.getChildCount(operator)));
+  }
+
+  public void updatePartitionStats(List<? extends PartitionStatsBatch> outgoing) {
+
+    for (PartitionStatsBatch o : outgoing) {
+      long totalRecords = o.getTotalRecords();
+
+      minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
+      maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
+    }
+    nSenders = outgoing.size();
+  }
+
+  @Override
+  public UserBitShared.OperatorProfile getProfile() {
+    final UserBitShared.OperatorProfile.Builder b = UserBitShared.OperatorProfile //
+        .newBuilder() //
+        .setOperatorType(operatorType) //
+        .setOperatorId(operatorId) //
+        .setSetupNanos(setupNanos) //
+        .setProcessNanos(processingNanos);
+
+    addAllMetrics(b);
+
+    return b.build();
+
+  }
+
+  public void addAllMetrics(UserBitShared.OperatorProfile.Builder b) {
+    super.addAllMetrics(b);
+
+    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(minReceiverRecordCount).
+        setMetricId(PartitionSenderStats.MIN_RECORDS.metricId()));
+    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(maxReceiverRecordCount).
+        setMetricId(PartitionSenderStats.MAX_RECORDS.metricId()));
+    b.addMetric(UserBitShared.MetricValue.newBuilder().setLongValue(nSenders)
+        .setMetricId(PartitionSenderStats.N_SENDERS.metricId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 0db8c07..256c106 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,25 +17,16 @@
  */
 package org.apache.drill.exec.physical.impl;
 
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.record.RecordBatch;
 
-public abstract class BaseRootExec<T extends PhysicalOperator> implements RootExec {
+public abstract class BaseRootExec implements RootExec {
 
-  protected final OperatorStats stats;
-  protected final OperatorContext oContext;
-
-  public BaseRootExec(FragmentContext context, T operator) throws OutOfMemoryException {
-    oContext = new OperatorContext(operator, context);
-    stats = oContext.getStats();
-  }
+  protected OperatorStats stats = null;
 
   @Override
   public final boolean next() {
+    // Stats should have been initialized
+    assert stats != null;
     try {
       stats.startProcessing();
       return innerNext();
@@ -44,5 +35,9 @@ public abstract class BaseRootExec<T extends PhysicalOperator> implements RootEx
     }
   }
 
+  public void setStats(OperatorStats stats) {
+    this.stats = stats;
+  }
+
   public abstract boolean innerNext();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5476a50..9be45d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,11 +32,13 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.SenderStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.BaseRootExec;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionerTemplate.OutgoingRecordBatch;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.*;
@@ -48,7 +50,6 @@ import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JType;
 import org.apache.drill.exec.vector.CopyUtil;
 
-
 public class PartitionSenderRootExec extends BaseRootExec {
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
@@ -56,24 +57,28 @@ public class PartitionSenderRootExec extends BaseRootExec {
   private HashPartitionSender operator;
   private Partitioner partitioner;
   private FragmentContext context;
+  private OperatorContext oContext;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
   private final StatusHandler statusHandler;
-
+  private final SenderStats stats;
 
   public PartitionSenderRootExec(FragmentContext context,
                                  RecordBatch incoming,
                                  HashPartitionSender operator) throws OutOfMemoryException {
 
-    super(context, operator);
     this.incoming = incoming;
     this.operator = operator;
     this.context = context;
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
     this.statusHandler = new StatusHandler(sendCount, context);
+    this.stats = new SenderStats(operator);
+    context.getStats().addOperatorStats(this.stats);
+    setStats(stats);
+    this.oContext = new OperatorContext(operator, context, stats);
   }
 
   @Override
@@ -140,6 +145,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           context.fail(e);
           return false;
         }
+        stats.updatePartitionStats(partitioner.getOutgoingBatches());
         for (VectorWrapper v : incoming) {
           v.clear();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
index 4790596..de5967f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderStats.java
@@ -22,7 +22,10 @@ import org.apache.drill.exec.ops.MetricDef;
 public enum PartitionSenderStats implements MetricDef {
 
   BATCHES_SENT,
-  RECORDS_SENT;
+  RECORDS_SENT,
+  MIN_RECORDS,
+  MAX_RECORDS,
+  N_SENDERS;
 
   @Override
   public int metricId() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
new file mode 100644
index 0000000..85ccffb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+
+public interface PartitionStatsBatch {
+
+  public long getTotalRecords();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 6958403..53528ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -25,8 +25,10 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
 
 import java.io.IOException;
+import java.util.List;
 
 public interface Partitioner {
 
@@ -42,6 +44,7 @@ public interface Partitioner {
   public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
   public abstract void initialize();
   public abstract void clear();
+  public abstract List<? extends PartitionStatsBatch> getOutgoingBatches();
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9b22d2c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 510327a..1e6e71b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -66,6 +66,11 @@ public abstract class PartitionerTemplate implements Partitioner {
   public PartitionerTemplate() throws SchemaChangeException {
   }
 
+  @Override
+  public List<? extends PartitionStatsBatch> getOutgoingBatches() {
+    return outgoingBatches;
+  }
+
   public final void setup(FragmentContext context,
                           RecordBatch incoming,
                           HashPartitionSender popConfig,
@@ -189,7 +194,7 @@ public abstract class PartitionerTemplate implements Partitioner {
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
   public abstract int doEval(@Named("inIndex") int inIndex);
 
-  public class OutgoingRecordBatch implements VectorAccessible {
+  public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible {
 
     private final DataTunnel tunnel;
     private final HashPartitionSender operator;
@@ -202,6 +207,7 @@ public abstract class PartitionerTemplate implements Partitioner {
     private boolean isLast = false;
     private BatchSchema outSchema;
     private int recordCount;
+    private int totalRecords;
     private OperatorStats stats;
     private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
     private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
@@ -224,6 +230,7 @@ public abstract class PartitionerTemplate implements Partitioner {
     protected boolean copy(int inIndex) throws IOException {
       if (doEval(inIndex, recordCount)) {
         recordCount++;
+        totalRecords++;
         if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
           flush();
         }
@@ -330,6 +337,12 @@ public abstract class PartitionerTemplate implements Partitioner {
       return recordCount;
     }
 
+
+    @Override
+    public long getTotalRecords() {
+      return totalRecords;
+    }
+
     @Override
     public TypedFieldId getValueVectorId(SchemaPath path) {
       return vectorContainer.getValueVectorId(path);