You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/06/12 06:15:49 UTC

[drill] branch master updated: DRILL-6478: enhance debug logs for batch sizing

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new cbcb59d  DRILL-6478: enhance debug logs for batch sizing
cbcb59d is described below

commit cbcb59df5ccc655e374ac6244c572088153aedc6
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Thu Jun 7 14:04:47 2018 -0700

    DRILL-6478: enhance debug logs for batch sizing
    
    closes #1310
---
 .../physical/impl/flatten/FlattenRecordBatch.java  | 23 +++++------
 .../exec/physical/impl/join/HashJoinBatch.java     | 37 +++++++++---------
 .../exec/physical/impl/join/MergeJoinBatch.java    | 44 +++++++++++-----------
 .../physical/impl/union/UnionAllRecordBatch.java   | 25 ++++++++++++
 4 files changed, 77 insertions(+), 52 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index be44c94..2f92d52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -157,9 +157,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       // i.e. all rows fit within memory budget.
       setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
 
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
-      }
+      logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
 
       updateIncomingStats();
     }
@@ -171,6 +169,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
+
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   @Override
@@ -263,7 +263,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     flattenMemoryManager.updateOutgoingStats(outputRecords);
 
     if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
     }
 
     // Get the final outcome based on hasRemainder since that will determine if all the incoming records were
@@ -516,14 +516,15 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
     stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());
 
-    logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
-      flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
-
-    logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
-      flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
+        flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
 
+      logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
+        flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 4267077..428a47e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -242,10 +242,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     batchMemoryManager.update(LEFT_INDEX, 0);
     batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming left:\n {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-      logger.debug("BATCH_STATS, incoming right:\n {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
-    }
+    logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+    logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
       state = BatchState.STOP;
@@ -358,7 +356,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
         batchMemoryManager.updateOutgoingStats(outputRecords);
         if (logger.isDebugEnabled()) {
-          logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+          logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
         }
 
         /* We are here because of one the following
@@ -890,6 +888,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
@@ -1004,21 +1003,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
     updateMetrics();
 
-    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
 
     this.cleanup();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index a5c2ae7..62967a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -122,9 +122,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     @Override
     public void update(int inputIndex) {
       status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
-      if (logger.isDebugEnabled()) {
-        logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
-      }
+      logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
     }
   }
 
@@ -132,8 +130,10 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     super(popConfig, context, true, left, right);
 
     // Instantiate the batch memory manager
-    final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right);
+    final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);
+
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
@@ -271,7 +271,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
     }
 
     if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
     }
 
     batchMemoryManager.updateOutgoingStats(getRecordCount());
@@ -281,21 +281,23 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
   public void close() {
     updateBatchMemoryManagerStats();
 
-    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
 
     super.close();
     leftIterator.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index f4c1900..36b9c9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -39,9 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
@@ -74,6 +76,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   @Override
@@ -168,6 +171,10 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
     batchStatus.recordsProcessed += recordCount;
     batchMemoryManager.updateOutgoingStats(recordCount);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    }
+
     if (callBack.getSchemaChangedAndReset()) {
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
@@ -361,6 +368,8 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
         if (topStatus.prefetched) {
           topStatus.prefetched = false;
           batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+          logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
+            batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
           return Pair.of(topStatus.outcome, topStatus);
         } else {
 
@@ -378,6 +387,8 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
             topStatus.recordsProcessed = 0;
             topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
             batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
+            logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
+              batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
             return Pair.of(outcome, topStatus);
           case OUT_OF_MEMORY:
           case STOP:
@@ -409,6 +420,20 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   public void close() {
     super.close();
     updateBatchMemoryManagerStats();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+        batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.