You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/09/09 05:05:26 UTC

[drill] 01/05: DRILL-6709: Extended the batch stats utility to other operators

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 85ebae5f1b447d1ec60e062ab9e00da7f3d186fb
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Fri Aug 24 08:55:52 2018 -0700

    DRILL-6709: Extended the batch stats utility to other operators
    
    closes #1444
---
 .../apache/drill/exec/physical/impl/ScanBatch.java |   3 +-
 .../exec/physical/impl/aggregate/HashAggBatch.java |  26 ++---
 .../physical/impl/aggregate/HashAggTemplate.java   |   8 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |  28 ++---
 .../exec/physical/impl/join/HashJoinBatch.java     |  45 ++++----
 .../exec/physical/impl/join/LateralJoinBatch.java  |  57 +++++-----
 .../exec/physical/impl/join/MergeJoinBatch.java    |  48 +++++----
 .../physical/impl/join/NestedLoopJoinBatch.java    |  32 +++---
 .../physical/impl/join/NestedLoopJoinTemplate.java |   7 +-
 .../impl/project/ProjectMemoryManager.java         |   8 +-
 .../physical/impl/project/ProjectRecordBatch.java  |   7 +-
 .../physical/impl/union/UnionAllRecordBatch.java   |  44 ++++----
 .../physical/impl/unnest/UnnestRecordBatch.java    |  26 +++--
 .../drill/exec/record/AbstractRecordBatch.java     |  11 ++
 .../batchsizing/OverflowSerDeUtil.java             |   6 +-
 .../drill/exec/util/record/RecordBatchStats.java   | 118 ++++++++++++++++++---
 16 files changed, 300 insertions(+), 174 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cda98fc..cd24a4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
@@ -302,7 +303,7 @@ public class ScanBatch implements CloseableRecordBatch {
       return; // NOOP
     }
 
-    RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
   }
 
   /** Might truncate the FQN if too long */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 28f8263..9de9aae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -60,6 +60,8 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -159,9 +161,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       }
 
       updateIncomingStats();
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
-      }
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
     }
   }
 
@@ -204,7 +204,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     }
 
     hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
 
     columnMapping = CaseInsensitiveMap.newHashMap();
   }
@@ -474,15 +476,15 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords());
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
-        hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
+      hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
 
-      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
-        hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
+      hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
   }
   @Override
   public void close() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 65ca829..e8ae30e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -70,7 +70,8 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 
 import org.apache.drill.exec.vector.FixedWidthVector;
@@ -1223,10 +1224,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
 
     this.outcome = IterOutcome.OK;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index d634fb2..1623319 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -52,6 +52,8 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -157,7 +159,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
 
-      logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
 
       updateIncomingStats();
     }
@@ -170,7 +172,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
 
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
   }
 
   @Override
@@ -261,10 +264,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     }
 
     flattenMemoryManager.updateOutgoingStats(outputRecords);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
     // consumed in current output batch or not
@@ -516,15 +516,15 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
-        flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
+      flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
 
-      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
-        flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
+      flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dc40b24..368bb5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -68,9 +68,10 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -291,7 +292,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
       buildBatch,
       () -> {
         batchMemoryManager.update(RIGHT_INDEX, 0, true);
-        logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
       });
   }
 
@@ -306,7 +307,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
       probeBatch,
       () -> {
         batchMemoryManager.update(LEFT_INDEX, 0);
-        logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
       });
   }
 
@@ -488,9 +489,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
           container.setRecordCount(outputRecords);
 
           batchMemoryManager.updateOutgoingStats(outputRecords);
-          if (logger.isDebugEnabled()) {
-            logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-          }
+          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
           /* We are here because of one the following
            * 1. Completed processing of all the records and we are done
@@ -1121,12 +1120,17 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
     int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
-    logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
-      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
+      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
     batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
+
     enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
   }
 
@@ -1242,19 +1246,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
     updateMetrics();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
 
     this.cleanup();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 5b7fd9c..1aaf5e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -240,25 +242,26 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
-        "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
-        batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(),
-        batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
+      "record count : %d", batchMemoryManager.getNumOutgoingBatches(),
+      batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(),
+      batchMemoryManager.getTotalOutputRecords());
 
     super.close();
   }
@@ -733,11 +736,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-      logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]",
         outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
-    }
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
@@ -1182,10 +1184,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
     final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
-        batchMemoryManager.getRecordBatchSizer(inputIndex));
-      logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
+    if (isRecordBatchStatsLoggingEnabled()) {
+      RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+      RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
     }
 
     if (useMemoryManager) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 7b14e03..72f776a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -58,6 +58,8 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
@@ -123,7 +125,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
-      logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
+      RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+      RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
     }
   }
 
@@ -134,7 +137,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);
 
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
@@ -271,10 +275,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
       vw.getValueVector().getMutator().setValueCount(getRecordCount());
     }
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
-
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     batchMemoryManager.updateOutgoingStats(getRecordCount());
   }
 
@@ -282,23 +283,24 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
 
     super.close();
     leftIterator.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 8054d7f..e2f93ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -52,8 +52,9 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.record.JoinBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -133,7 +134,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+        "configured output batch size: %d", configuredBatchSize);
   }
 
   /**
@@ -168,7 +171,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
           case OK:
             // For right side, use aggregate i.e. average row width across batches
             batchMemoryManager.update(RIGHT_INDEX, 0, true);
-            logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+            RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+              batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
             addBatchToHyperContainer(right);
             break;
           case OUT_OF_MEMORY:
@@ -202,10 +206,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     container.setRecordCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
-
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     logger.debug("Number of records emitted: " + outputRecords);
 
     return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -357,7 +358,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
       }
 
       batchMemoryManager.update(RIGHT_INDEX, 0, true);
-      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+        batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
 
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
@@ -395,7 +397,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
       }
 
       batchMemoryManager.update(LEFT_INDEX, 0);
-      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+        batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
 
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
@@ -422,23 +425,24 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   public void close() {
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
               batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
               batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
               batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
               batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
 
     rightContainer.clear();
     rightCounts.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index adf681b..3b8ab8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -24,7 +24,8 @@ import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import javax.inject.Named;
 import java.util.LinkedList;
 import java.util.List;
@@ -192,7 +193,9 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
         break;
       case OK:
         setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
-        logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+          outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
+          outgoing.getRecordBatchStatsContext());
         leftRecordCount = left.getRecordCount();
         break;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 6b273d8..81e54db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -28,6 +28,8 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -136,7 +138,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
     public ProjectMemoryManager(int configuredOutputSize) {
         super(configuredOutputSize);
         outputColumnSizes = new HashMap<>();
-        logger.debug("BATCH_STATS, configuredOutputSize: {}", configuredOutputSize);
     }
 
     public boolean isComplex(MajorType majorType) {
@@ -253,6 +254,9 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
         setIncomingBatch(incomingBatch);
         setOutgoingBatch(outgoingBatch);
         reset();
+
+        RecordBatchStats.logRecordBatchStats(outgoingBatch.getRecordBatchStatsContext(),
+          "configuredOutputSize: %d", getOutputBatchSize());
     }
 
     private void reset() {
@@ -321,7 +325,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
                     rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
                     (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
 
-        logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
         updateIncomingStats();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 247f36f..4d55f00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -55,13 +55,14 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.UntypedNullHolder;
@@ -252,7 +253,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
 
     memoryManager.updateOutgoingStats(outputRecords);
-    logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
     // consumed in current output batch or not
@@ -298,7 +299,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
 
     memoryManager.updateOutgoingStats(projRecords);
-    logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
   }
 
   public void addComplexWriter(final ComplexWriter writer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8df83ee..7e16d6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -43,13 +43,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.resolver.TypeCastRules;
 import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
@@ -76,7 +77,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "configured output batch size: %d", configuredBatchSize);
   }
 
   @Override
@@ -171,9 +174,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
     batchStatus.recordsProcessed += recordCount;
     batchMemoryManager.updateOutgoingStats(recordCount);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
 
     if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
@@ -368,8 +369,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
         if (topStatus.prefetched) {
           topStatus.prefetched = false;
           batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
-          logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
-            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+          RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+            getRecordBatchStatsContext());
           return Pair.of(topStatus.outcome, topStatus);
         } else {
 
@@ -387,8 +389,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
             topStatus.recordsProcessed = 0;
             topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
             batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
-            logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
-              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+            RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+              getRecordBatchStatsContext());
             return Pair.of(outcome, topStatus);
           case OUT_OF_MEMORY:
           case STOP:
@@ -421,19 +424,20 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
     super.close();
     updateBatchMemoryManagerStats();
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
 
-      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
 
-      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
-    }
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d",
+      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d58d242..5f63967 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
@@ -121,10 +123,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       // Limit to lower bound of total number of rows possible for this batch
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
-
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
-      }
+      RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
 
       updateIncomingStats();
     }
@@ -306,9 +305,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     // entire incoming recods has been unnested. If the entire records has been
     // unnested, we return EMIT and any blocking operators in the pipeline will
     // unblock.
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-    }
+    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
     return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
   }
 
@@ -420,14 +417,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, memoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, memoryManager.getTotalOutputRecords());
 
-    logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch bytes : {},  avg row bytes : {}, record count : {}",
-        memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
-        memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch bytes : {},  avg row bytes : {}, record count : {}",
-        memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
-        memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "incoming aggregate: batch count : %d, avg batch bytes : %d,  avg row bytes : %d, record count : %d",
+      memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
+      memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
 
+    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+      "outgoing aggregate: batch count : %d, avg batch bytes : %d,  avg row bytes : %d, record count : %d",
+      memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
+      memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
   }
 
   private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index dfa1424..c38de2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 
 public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
@@ -41,6 +42,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final T popConfig;
   protected final FragmentContext context;
   protected final OperatorContext oContext;
+  protected final RecordBatchStatsContext batchStatsContext;
   protected final OperatorStats stats;
   protected final boolean unionTypeEnabled;
   protected BatchState state;
@@ -58,6 +60,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
+    this.batchStatsContext = new RecordBatchStatsContext(context, oContext);
     stats = oContext.getStats();
     container = new VectorContainer(this.oContext.getAllocator());
     if (buildSchema) {
@@ -241,4 +244,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   public VectorContainer getContainer() {
     return  container;
   }
+
+  public RecordBatchStatsContext getRecordBatchStatsContext() {
+    return batchStatsContext;
+  }
+
+  public boolean isRecordBatchStatsLoggingEnabled() {
+    return batchStatsContext.isEnableBatchSzLogging();
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index 4a0e1e8..14b3132 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -85,10 +85,8 @@ final class OverflowSerDeUtil {
     // Allocate the required memory to serialize the overflow fields
     final DrillBuf buffer = allocator.buffer(bufferLength);
 
-    if (batchStatsContext.isEnableBatchSzLogging()) {
-      final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
-      RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
-    }
+    RecordBatchStats.logRecordBatchStats(batchStatsContext,
+      "Allocated a buffer of length [%d] to handle overflow", bufferLength);
 
     // Create the result object
     final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 0b24244..c61607e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.util.record;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -134,25 +135,79 @@ public final class RecordBatchStats {
     }
   }
 
+  /** Indicates whether a record batch is Input or Output */
+  public enum RecordBatchIOType {
+    INPUT ("incoming"),
+    INPUT_RIGHT ("incoming right"),
+    INPUT_LEFT ("incoming left"),
+    OUTPUT ("outgoing"),
+    PASSTHROUGH ("passthrough");
+
+    private final String ioTypeString;
+
+    private RecordBatchIOType(String ioTypeString) {
+      this.ioTypeString = ioTypeString;
+    }
+
+    /**
+     * @return IO Type string
+     */
+    public String getIOTypeString() {
+      return ioTypeString;
+    }
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext);
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext);
+  }
+
   /**
-   * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+   * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)}
    */
-  public static void logRecordBatchStats(RecordBatch recordBatch,
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    RecordBatchSizer recordBatchSizer,
     RecordBatchStatsContext batchStatsContext) {
 
-    logRecordBatchStats(null, recordBatch, batchStatsContext);
+    logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext);
   }
 
   /**
    * Logs record batch statistics for the input record batch (logging happens only
    * when record statistics logging is enabled).
    *
+   * @param ioType whether a record batch is an input or/and output
    * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
+   * @param batchSizer contains batch sizing information
    * @param batchStatsContext batch stats context object
    */
-  public static void logRecordBatchStats(String sourceId,
-    RecordBatch recordBatch,
+  public static void logRecordBatchStats(RecordBatchIOType ioType,
+    String sourceId,
+    RecordBatchSizer batchSizer,
     RecordBatchStatsContext batchStatsContext) {
 
     if (!batchStatsContext.isEnableBatchSzLogging()) {
@@ -161,7 +216,7 @@ public final class RecordBatchStats {
 
     final String statsId = batchStatsContext.getContextOperatorId();
     final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
-    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+    final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose);
 
     logBatchStatsMsg(batchStatsContext, msg, false);
   }
@@ -170,7 +225,6 @@ public final class RecordBatchStats {
    * Logs a generic batch statistics message
    *
    * @param message log message
-   * @param batchStatsLogging
    * @param batchStatsContext batch stats context object
    */
   public static void logRecordBatchStats(String message,
@@ -184,6 +238,25 @@ public final class RecordBatchStats {
   }
 
   /**
+   * Logs a generic batch statistics message
+   *
+   * @param batchStatsContext batch stats context object
+   * @param format a string format as in {@link String#format} method
+   * @param args format's arguments
+   */
+  public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext,
+    String format,
+    Object...args) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final String message = String.format(format, args);
+    logBatchStatsMsg(batchStatsContext, message, true);
+  }
+
+  /**
    * @param allocator dumps allocator statistics
    * @return string with allocator statistics
    */
@@ -212,27 +285,30 @@ public final class RecordBatchStats {
    * Constructs record batch statistics for the input record batch
    *
    * @param stats instance identifier
+   * @param ioType whether a record batch is an input or/and output
    * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
+   * @param batchSizer contains batch sizing information
    * @param verbose whether to include fine-grained stats
    *
    * @return a string containing the record batch statistics
    */
   private static String printRecordBatchStats(String statsId,
+    RecordBatchIOType ioType,
     String sourceId,
-    RecordBatch recordBatch,
+    RecordBatchSizer batchSizer,
     boolean verbose) {
 
-    final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
     final StringBuilder msg = new StringBuilder();
 
     msg.append(BATCH_STATS_PREFIX);
-    msg.append(" Originator: {");
+    msg.append(" Operator: {");
     msg.append(statsId);
     if (sourceId != null) {
       msg.append(':');
       msg.append(sourceId);
     }
+    msg.append("}, IO Type: {");
+    msg.append(toString(ioType));
     msg.append("}, Batch size: {");
     msg.append( "  Records: " );
     msg.append(batchSizer.rowCount());
@@ -269,7 +345,8 @@ public final class RecordBatchStats {
     boolean includePrefix) {
 
     if (includePrefix) {
-      msg = BATCH_STATS_PREFIX + '\t' + msg;
+      final String statsId = batchStatsContext.getContextOperatorId();
+      msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg;
     }
 
     if (batchStatsContext.useInfoLevelLogging()) {
@@ -279,4 +356,19 @@ public final class RecordBatchStats {
     }
   }
 
+  private static String toString(RecordBatchIOType ioType) {
+    Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null");
+
+    switch (ioType) {
+      case INPUT: return "incoming";
+      case INPUT_RIGHT: return "incoming right";
+      case INPUT_LEFT: return "incoming left";
+      case OUTPUT: return "outgoing";
+      case PASSTHROUGH: return "passthrough";
+
+      default: throw new RuntimeException("Unexpected record batch IO type..");
+    }
+
+  }
+
 }
\ No newline at end of file