You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/13 03:45:13 UTC

[GitHub] sohami closed pull request #1355: DRILL-6560: Enhanced the batch statistics logging enablement

sohami closed pull request #1355: DRILL-6560: Enhanced the batch statistics logging enablement
URL: https://github.com/apache/drill/pull/1355
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4c840a41398..d0842d2996d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -711,5 +711,8 @@ public static String bootDefaultFor(String name) {
   public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
   public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);
 
+  /** Controls the list of operators for which batch sizing stats should be enabled */
+  public static final String STATS_LOGGING_BATCH_OPERATOR_OPTION = "drill.exec.stats.logging.enabled_operators";
+  public static final StringValidator STATS_LOGGING_BATCH_OPERATOR_VALIDATOR = new StringValidator(STATS_LOGGING_BATCH_OPERATOR_OPTION);
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 09e785e259f..4a2cd2c58f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,7 +82,7 @@
   private final BufferAllocator allocator;
   private final List<Map<String, String>> implicitColumnList;
   private String currentReaderClassName;
-  private final RecordBatchStatsContext batchStatsLogging;
+  private final RecordBatchStatsContext batchStatsContext;
 
   /**
    *
@@ -121,7 +121,7 @@ public ScanBatch(FragmentContext context,
       this.implicitColumnList = implicitColumnList;
       addImplicitVectors();
       currentReader = null;
-      batchStatsLogging = new RecordBatchStatsContext(context, oContext);
+      batchStatsContext = new RecordBatchStatsContext(context, oContext);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -304,12 +304,7 @@ private void logRecordBatchStats() {
       return; // NOOP
     }
 
-    RecordBatchStats.logRecordBatchStats(
-      batchStatsLogging.getContextOperatorId(),
-      getFQNForLogging(MAX_FQN_LENGTH),
-      this,
-      batchStatsLogging,
-      logger);
+    RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
   }
 
   /** Might truncate the FQN if too long */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a16bb4d95cd..5ee3825bd28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -46,8 +46,8 @@
 
 /**
  *  <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
- *  Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
- *  persist between restarts.
+ * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
+ * persist between restarts.
  *  </p>
  *
  *  <p> All the system options are externalized into conf file. While adding a new system option
@@ -235,6 +235,7 @@
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index e1ca73f1776..e33a5050f86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -35,6 +35,7 @@
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -240,7 +241,7 @@ public boolean useBulkReader() {
   public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
     this.operatorContext = operatorContext;
     schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
-    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead);
+    batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext));
 
     logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 7bdc33ef5cf..1fb224d6fac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -34,11 +34,11 @@
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowState;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.FieldOverflowStateContainer;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager.VarLenColumnBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats;
 import org.apache.drill.exec.vector.ValueVector;
 
 /** Class which handles reading a batch of rows from a set of variable columns */
 public class VarLenBinaryReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLenBinaryReader.class);
 
   final ParquetRecordReader parentReader;
   final RecordBatchSizerManager batchSizer;
@@ -170,7 +170,8 @@ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int
 
         // Lazy initialization
         if (builder == null) {
-          builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator());
+          builder = RecordBatchOverflow.newBuilder(parentReader.getOperatorContext().getAllocator(),
+            batchSizer.getBatchStatsContext());
         }
 
         final int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
@@ -181,7 +182,7 @@ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats, int
     // Register batch overflow data with the record batch sizer manager (if any)
     if (builder != null) {
       Map<String, FieldOverflowStateContainer> overflowContainerMap = parentReader.batchSizerMgr.getFieldOverflowMap();
-      Map<String, FieldOverflowDefinition> overflowDefMap           = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
+      Map<String, FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
 
       for (Map.Entry<String, FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
         FieldOverflowStateContainer overflowStateContainer = new FieldOverflowStateContainer(entry.getValue(), null);
@@ -197,9 +198,9 @@ private void reorderVLColumns() {
     // Finally, re-order the variable length columns since an overflow occurred
     Collections.sort(orderedColumns, comparator);
 
-    if (logger.isDebugEnabled()) {
-      boolean isFirstValue    = true;
-      final StringBuilder msg = new StringBuilder(RecordBatchSizerManager.BATCH_STATS_PREFIX);
+    if (batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) {
+      boolean isFirstValue = true;
+      final StringBuilder msg = new StringBuilder();
       msg.append(": Dumping the variable length columns read order: ");
 
       for (VLColumnContainer container : orderedColumns) {
@@ -212,7 +213,7 @@ private void reorderVLColumns() {
       }
       msg.append('.');
 
-      logger.debug(msg.toString());
+      RecordBatchStats.logRecordBatchStats(msg.toString(), batchSizer.getBatchStatsContext());
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index c5428034915..4a0e1e81ea6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -26,6 +26,8 @@
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowEntry;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowContainer;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.RecordOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.UInt1Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 
@@ -44,7 +46,6 @@
  * </ul>
  */
 final class OverflowSerDeUtil {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OverflowSerDeUtil.class);
 
   /**
    * Serializes a collection of overflow fields into a memory buffer:
@@ -56,10 +57,12 @@
    *
    * @param fieldOverflowEntries input collection of field overflow entries
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context object
    * @return record overflow container; null if the input buffer is empty
    */
   static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowEntries,
-    BufferAllocator allocator) {
+    BufferAllocator allocator,
+    RecordBatchStatsContext batchStatsContext) {
 
     if (fieldOverflowEntries == null || fieldOverflowEntries.isEmpty()) {
       return null;
@@ -82,8 +85,9 @@ static RecordOverflowContainer serialize(List<FieldOverflowEntry> fieldOverflowE
     // Allocate the required memory to serialize the overflow fields
     final DrillBuf buffer = allocator.buffer(bufferLength);
 
-    if (logger.isDebugEnabled()) {
-      logger.debug(String.format("Allocated a buffer of length %d to handle overflow", bufferLength));
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
+      RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
     }
 
     // Create the result object
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
index 76422ae8b3e..462ddf08d4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchOverflow.java
@@ -24,6 +24,7 @@
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
@@ -39,10 +40,11 @@
 
   /**
    * @param allocator buffer allocator
+   * @param batchStatsContext batch statistics context
    * @return new builder object
    */
-  public static Builder newBuilder(BufferAllocator allocator) {
-    return new Builder(allocator);
+  public static Builder newBuilder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
+    return new Builder(allocator, batchStatsContext);
   }
 
   /**
@@ -75,13 +77,17 @@ private RecordBatchOverflow(RecordOverflowDefinition recordOverflowDef,
     private final List<FieldOverflowEntry> fieldOverflowEntries = new ArrayList<FieldOverflowEntry>();
     /** Buffer allocator */
     private final BufferAllocator allocator;
+    /** Batch statistics context */
+    private final RecordBatchStatsContext batchStatsContext;
 
     /**
      * Build class to construct a {@link RecordBatchOverflow} object.
      * @param allocator buffer allocator
+     * @param batchStatsContext batch statistics context
      */
-    private Builder(BufferAllocator allocator) {
+    private Builder(BufferAllocator allocator, RecordBatchStatsContext batchStatsContext) {
       this.allocator = allocator;
+      this.batchStatsContext = batchStatsContext;
     }
 
     /**
@@ -101,9 +107,8 @@ public void addFieldOverflow(ValueVector vector, int firstValueIdx, int numValue
      * @return a new built {link BatchRecordOverflow} object instance
      */
     public RecordBatchOverflow build() {
-      RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator);
-      RecordBatchOverflow result                =
-        new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
+      RecordOverflowContainer overflowContainer = OverflowSerDeUtil.serialize(fieldOverflowEntries, allocator, batchStatsContext);
+      RecordBatchOverflow result = new RecordBatchOverflow(overflowContainer.recordOverflowDef, allocator);
 
       return result;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index 01644f79afa..5ddcf7e9205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -30,6 +30,8 @@
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetSchema;
 import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow.FieldOverflowDefinition;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -39,7 +41,7 @@
  */
 public final class RecordBatchSizerManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizerManager.class);
-  public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
+
 
   /** Minimum column memory size */
   private static final int MIN_COLUMN_MEMORY_SZ = VarLenColumnBulkInput.getMinVLColumnMemorySize();
@@ -78,6 +80,9 @@
    */
   private Map<String, FieldOverflowStateContainer> fieldOverflowMap = CaseInsensitiveMap.newHashMap();
 
+  /** For controlling batch statistics logging */
+  private final RecordBatchStatsContext batchStatsContext;
+
   /**
    * Constructor.
    *
@@ -87,7 +92,8 @@
    */
   public RecordBatchSizerManager(OptionManager options,
     ParquetSchema schema,
-    long totalRecordsToRead) {
+    long totalRecordsToRead,
+    RecordBatchStatsContext batchStatsContext) {
 
     this.schema = schema;
     this.totalRecordsToRead = totalRecordsToRead;
@@ -97,6 +103,7 @@ public RecordBatchSizerManager(OptionManager options,
     this.maxRecordsPerBatch = this.configRecordsPerBatch;
     this.recordsPerBatch = this.configRecordsPerBatch;
     this.overflowOptimizer = new BatchOverflowOptimizer(columnMemoryInfoMap);
+    this.batchStatsContext = batchStatsContext;
   }
 
   /**
@@ -130,6 +137,13 @@ public ParquetSchema getSchema() {
     return schema;
   }
 
+  /**
+   * @return batch statistics context
+   */
+  public RecordBatchStatsContext getBatchStatsContext() {
+    return batchStatsContext;
+  }
+
   /**
    * Allocates value vectors for the current batch.
    *
@@ -282,10 +296,9 @@ private int normalizeNumRecordsPerBatch() {
       normalizedNumRecords = (int) totalRecordsToRead;
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader number of record(s) has been set to [%d]",
-        BATCH_STATS_PREFIX, normalizedNumRecords);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader number of record(s) has been set to [%d]", normalizedNumRecords);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedNumRecords;
@@ -319,10 +332,9 @@ private int normalizeMemorySizePerBatch() {
       logger.warn(message);
     }
 
-    if (logger.isDebugEnabled()) {
-      final String message = String.format("%s: The Parquet reader batch memory has been set to [%d] byte(s)",
-        BATCH_STATS_PREFIX, normalizedMemorySize);
-      logger.debug(message);
+    if (batchStatsContext.isEnableBatchSzLogging()) {
+      final String message = String.format("The Parquet reader batch memory has been set to [%d] byte(s)", normalizedMemorySize);
+      RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
     }
 
     return normalizedMemorySize;
@@ -370,13 +382,12 @@ private void assignColumnsBatchMemory() {
     assignFineGrainedMemoryQuota();
 
     // log the new record batch if it changed
-    if (logger.isDebugEnabled()) {
+    if (batchStatsContext.isEnableBatchSzLogging()) {
       assert recordsPerBatch <= maxRecordsPerBatch;
 
       if (originalRecordsPerBatch != recordsPerBatch) {
-        final String message = String.format("%s: The Parquet records per batch [%d] has been decreased to [%d]",
-          BATCH_STATS_PREFIX, originalRecordsPerBatch, recordsPerBatch);
-        logger.debug(message);
+        final String message = String.format("The Parquet records per batch [%d] has been decreased to [%d]", originalRecordsPerBatch, recordsPerBatch);
+        RecordBatchStats.logRecordBatchStats(message, batchStatsContext);
       }
 
       // Now dump the per column memory quotas
@@ -504,12 +515,12 @@ private double computeNeededMemoryRatio(MemoryRequirementContainer requiredMemor
   }
 
   private void dumpColumnMemoryQuotas() {
-    StringBuilder msg = new StringBuilder(BATCH_STATS_PREFIX);
+    StringBuilder msg = new StringBuilder();
     msg.append(": Field Quotas:\n\tName\tType\tPrec\tQuota\n");
 
     for (ColumnMemoryInfo columnInfo : columnMemoryInfoMap.values()) {
       msg.append("\t");
-      msg.append(BATCH_STATS_PREFIX);
+      msg.append(RecordBatchStats.BATCH_STATS_PREFIX);
       msg.append("\t");
       msg.append(columnInfo.columnMeta.getField().getName());
       msg.append("\t");
@@ -521,7 +532,7 @@ private void dumpColumnMemoryQuotas() {
       msg.append("\n");
     }
 
-    logger.debug(msg.toString());
+    RecordBatchStats.logRecordBatchStats(msg.toString(), batchStatsContext);
   }
 
   private  static void printType(MaterializedField field, StringBuilder msg) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 8b213a815af..0b242440929 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.util.record;
 
-import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -25,7 +24,6 @@
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
 import org.apache.drill.exec.record.RecordBatchSizer.ColumnSize;
@@ -34,13 +32,14 @@
  * Utility class to capture key record batch statistics.
  */
 public final class RecordBatchStats {
+  // Logger
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStats.class);
+
   /** A prefix for all batch stats to simplify search */
   public static final String BATCH_STATS_PREFIX = "BATCH_STATS";
 
   /** Helper class which loads contextual record batch logging options */
   public static final class RecordBatchStatsContext {
-    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchStatsContext.class);
-
     /** batch size logging for all readers */
     private final boolean enableBatchSzLogging;
     /** Fine grained batch size logging */
@@ -52,8 +51,17 @@
      * @param options options manager
      */
     public RecordBatchStatsContext(FragmentContext context, OperatorContext oContext) {
-      enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
-      enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+      final boolean operatorEnabledForStatsLogging = isBatchStatsEnabledForOperator(context, oContext);
+
+      if (operatorEnabledForStatsLogging) {
+        enableBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_BATCH_SIZE_OPTION);
+        enableFgBatchSzLogging = context.getOptions().getBoolean(ExecConstants.STATS_LOGGING_FG_BATCH_SIZE_OPTION);
+
+      } else {
+        enableBatchSzLogging = false;
+        enableFgBatchSzLogging = false;
+      }
+
       contextOperatorId = new StringBuilder()
         .append(getQueryId(context))
         .append(":")
@@ -100,6 +108,104 @@ private String getQueryId(FragmentContext _context) {
       }
       return "NA";
     }
+
+    private boolean isBatchStatsEnabledForOperator(FragmentContext context, OperatorContext oContext) {
+      // The configuration can select what operators should log batch statistics
+      final String statsLoggingOperator = context.getOptions().getString(ExecConstants.STATS_LOGGING_BATCH_OPERATOR_OPTION).toUpperCase();
+      final String allOperatorsStr = "ALL";
+
+      // All operators are allowed to log batch statistics
+      if (allOperatorsStr.equals(statsLoggingOperator)) {
+        return true;
+      }
+
+      // No, only a select few are allowed; syntax: operator-id-1,operator-id-2,..
+      final String[] operators = statsLoggingOperator.split(",");
+      final String operatorId = oContext.getStats().getId().toUpperCase();
+
+      for (int idx = 0; idx < operators.length; idx++) {
+        // We use "contains" because the operator identifier is a composite string; e.g., 3:[PARQUET_ROW_GROUP_SCAN]
+        if (operatorId.contains(operators[idx].trim())) {
+          return true;
+        }
+      }
+
+      return false;
+    }
+  }
+
+  /**
+   * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+   */
+  public static void logRecordBatchStats(RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    logRecordBatchStats(null, recordBatch, batchStatsContext);
+  }
+
+  /**
+   * Logs record batch statistics for the input record batch (logging happens only
+   * when record statistics logging is enabled).
+   *
+   * @param sourceId optional source identifier for scanners
+   * @param recordBatch a set of records
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String sourceId,
+    RecordBatch recordBatch,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    final String statsId = batchStatsContext.getContextOperatorId();
+    final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
+    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+
+    logBatchStatsMsg(batchStatsContext, msg, false);
+  }
+
+  /**
+   * Logs a generic batch statistics message
+   *
+   * @param message log message
+   * @param batchStatsLogging
+   * @param batchStatsContext batch stats context object
+   */
+  public static void logRecordBatchStats(String message,
+    RecordBatchStatsContext batchStatsContext) {
+
+    if (!batchStatsContext.isEnableBatchSzLogging()) {
+      return; // NOOP
+    }
+
+    logBatchStatsMsg(batchStatsContext, message, true);
+  }
+
+  /**
+   * @param allocator dumps allocator statistics
+   * @return string with allocator statistics
+   */
+  public static String printAllocatorStats(BufferAllocator allocator) {
+    StringBuilder msg = new StringBuilder();
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": dumping allocator statistics:\n");
+    msg.append(BATCH_STATS_PREFIX);
+    msg.append(": ");
+    msg.append(allocator.toString());
+
+    return msg.toString();
+  }
+
+// ----------------------------------------------------------------------------
+// Local Implementation
+// ----------------------------------------------------------------------------
+
+  /**
+   * Disabling class object instantiation.
+   */
+  private RecordBatchStats() {
   }
 
   /**
@@ -112,7 +218,7 @@ private String getQueryId(FragmentContext _context) {
    *
    * @return a string containing the record batch statistics
    */
-  public static String printRecordBatchStats(String statsId,
+  private static String printRecordBatchStats(String statsId,
     String sourceId,
     RecordBatch recordBatch,
     boolean verbose) {
@@ -158,68 +264,19 @@ public static String printRecordBatchStats(String statsId,
     return msg.toString();
   }
 
-  /**
-   * Logs record batch statistics for the input record batch (logging happens only
-   * when record statistics logging is enabled).
-   *
-   * @param stats instance identifier
-   * @param sourceId optional source identifier for scanners
-   * @param recordBatch a set of records
-   * @param verbose whether to include fine-grained stats
-   * @param logger Logger where to print the record batch statistics
-   */
-  public static void logRecordBatchStats(String statsId,
-    String sourceId,
-    RecordBatch recordBatch,
-    RecordBatchStatsContext batchStatsLogging,
-    org.slf4j.Logger logger) {
+  private static void logBatchStatsMsg(RecordBatchStatsContext batchStatsContext,
+    String msg,
+    boolean includePrefix) {
 
-    if (!batchStatsLogging.isEnableBatchSzLogging()) {
-      return; // NOOP
+    if (includePrefix) {
+      msg = BATCH_STATS_PREFIX + '\t' + msg;
     }
 
-    final boolean verbose = batchStatsLogging.isEnableFgBatchSzLogging();
-    final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
-
-    if (batchStatsLogging.useInfoLevelLogging()) {
+    if (batchStatsContext.useInfoLevelLogging()) {
       logger.info(msg);
     } else {
       logger.debug(msg);
     }
   }
 
-  /**
-   * Prints a materialized field type
-   * @param field materialized field
-   * @param msg string builder where to append the field type
-   */
-  public static void printFieldType(MaterializedField field, StringBuilder msg) {
-    final MajorType type = field.getType();
-
-    msg.append(type.getMinorType().name());
-    msg.append(':');
-    msg.append(type.getMode().name());
-  }
-
-  /**
-   * @param allocator dumps allocator statistics
-   * @return string with allocator statistics
-   */
-  public static String printAllocatorStats(BufferAllocator allocator) {
-    StringBuilder msg = new StringBuilder();
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": dumping allocator statistics:\n");
-    msg.append(BATCH_STATS_PREFIX);
-    msg.append(": ");
-    msg.append(allocator.toString());
-
-    return msg.toString();
-  }
-
-  /**
-   * Disabling class object instantiation.
-   */
-  private RecordBatchStats() {
-  }
-
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b0cc209cd0a..19e779d16dc 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -488,6 +488,7 @@ drill.exec.options: {
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
     drill.exec.stats.logging.fine_grained.batch_size: false,
+    drill.exec.stats.logging.enabled_operators: all,
     new_view_default_permissions: 700,
     org.apache.drill.exec.compile.ClassTransformer.scalar_replacement: "try",
     planner.add_producer_consumer: false,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services