You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/12/04 12:55:59 UTC

[drill] 05/11: DRILL-7450: Improve performance for ANALYZE command

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 20293b63c0bb559ae35d57f7cb1ab7fa24e9ee6d
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Fri Nov 22 19:53:08 2019 +0200

    DRILL-7450: Improve performance for ANALYZE command
    
    - Implement two-phase aggregation for the lowest metadata aggregate to optimize performance
    - Allow using complex functions with hash aggregate
    - Use hash aggregation for PHASE_1of2 for ANALYZE to reduce memory usage and avoid sorting non-aggregated data
    - Add sort above hash aggregation to fix correctness of merge exchange and stream aggregate
    
    closes #1907
---
 docs/dev/MetastoreAnalyze.md                       |  61 ++++-
 .../org/apache/drill/exec/expr/IsPredicate.java    |   2 +-
 .../drill/exec/expr/fn/DrillAggFuncHolder.java     |   4 +-
 .../expr/fn/DrillComplexWriterAggFuncHolder.java   |  40 ++-
 .../apache/drill/exec/expr/fn/DrillFuncHolder.java |  78 ++++--
 .../drill/exec/metastore/ColumnNamesOptions.java   |  80 ++++++
 .../metastore/analyze/AnalyzeFileInfoProvider.java |  24 +-
 .../metastore/analyze/AnalyzeInfoProvider.java     |  20 +-
 .../analyze/AnalyzeParquetInfoProvider.java        |  18 +-
 .../analyze/FileMetadataInfoCollector.java         |   3 +-
 .../analyze/MetadataAggregateContext.java          |  24 ++
 .../base/AbstractGroupScanWithMetadata.java        |  36 ++-
 .../exec/physical/config/HashToMergeExchange.java  |   3 +-
 ...MetadataAggPOP.java => MetadataHashAggPOP.java} |  26 +-
 ...tadataAggPOP.java => MetadataStreamAggPOP.java} |  19 +-
 .../exec/physical/impl/aggregate/HashAggBatch.java | 136 ++++++---
 .../physical/impl/aggregate/HashAggTemplate.java   |  15 +-
 .../physical/impl/aggregate/StreamingAggBatch.java |  11 +-
 .../physical/impl/flatten/FlattenRecordBatch.java  |   2 +-
 ...aAggBatch.java => MetadataAggregateHelper.java} | 173 +++++++-----
 .../impl/metadata/MetadataControllerBatch.java     |  44 +--
 .../impl/metadata/MetadataHandlerBatch.java        |  49 ++--
 .../impl/metadata/MetadataHashAggBatch.java        |  56 ++++
 ...eator.java => MetadataHashAggBatchCreator.java} |   8 +-
 .../impl/metadata/MetadataStreamAggBatch.java      |  62 +++++
 ...tor.java => MetadataStreamAggBatchCreator.java} |   8 +-
 .../physical/impl/project/ProjectRecordBatch.java  |   4 +-
 .../physical/impl/validate/BatchValidator.java     |   8 +
 .../resultSet/model/single/BaseReaderBuilder.java  |   2 +-
 .../exec/physical/rowSet/RowSetFormatter.java      |  13 +-
 .../apache/drill/exec/planner/PlannerPhase.java    |   2 +
 .../logical/ConvertCountToDirectScanRule.java      |   2 +-
 .../ConvertMetadataAggregateToDirectScanRule.java  | 271 ++++++++++++++++++
 .../planner/physical/DrillDistributionTrait.java   |  81 +++++-
 .../drill/exec/planner/physical/HashAggPrule.java  |  14 +-
 .../drill/exec/planner/physical/HashPrelUtil.java  |  37 +--
 .../exec/planner/physical/MetadataAggPrule.java    | 202 ++++++++++++--
 .../planner/physical/MetadataHandlerPrule.java     |   2 +-
 ...tadataAggPrel.java => MetadataHashAggPrel.java} |  22 +-
 ...dataAggPrel.java => MetadataStreamAggPrel.java} |  21 +-
 .../drill/exec/planner/physical/PrelUtil.java      |  27 +-
 .../sql/handlers/MetastoreAnalyzeTableHandler.java |  42 +--
 .../apache/drill/exec/store/ColumnExplorer.java    |  25 +-
 .../store/parquet/AbstractParquetGroupScan.java    |  98 +++++--
 .../ParquetFileTableMetadataProviderBuilder.java   |   4 +
 .../exec/store/pojo/DynamicPojoRecordReader.java   |  25 +-
 .../drill/exec/fn/impl/TestAggregateFunction.java  |   2 +-
 .../drill/exec/fn/impl/TestAggregateFunctions.java | 206 ++++++++++----
 .../physical/impl/agg/TestAggWithAnyValue.java     | 304 ++++++++++++++++-----
 .../physical/impl/agg/TestHashAggEmitOutcome.java  | 205 +++++++-------
 .../drill/exec/sql/TestMetastoreCommands.java      | 285 +++++++++++++++----
 .../test/resources/functions/test_covariance.json  |   6 +-
 .../resources/functions/test_logical_aggr.json     |   6 +-
 .../src/main/codegen/templates/ComplexWriters.java |   6 +-
 .../expression/FunctionHolderExpression.java       |   2 +-
 .../common/logical/data/MetadataAggregate.java     |   2 +-
 56 files changed, 2225 insertions(+), 703 deletions(-)

diff --git a/docs/dev/MetastoreAnalyze.md b/docs/dev/MetastoreAnalyze.md
index ec27c50..23b2b32 100644
--- a/docs/dev/MetastoreAnalyze.md
+++ b/docs/dev/MetastoreAnalyze.md
@@ -94,24 +94,63 @@ Analyze command specific operators:
  - `MetadataControllerBatch` - responsible for converting obtained metadata, fetching absent metadata from the Metastore
   and storing resulting metadata into the Metastore.
 
-`MetastoreAnalyzeTableHandler` forms plan  depending on segments count in the following form:
+`MetastoreAnalyzeTableHandler` forms plan depending on segments count in the following form:
 
 ```
-MetadataControllerBatch
+MetadataControllerRel
   ...
-    MetadataHandlerBatch
-      MetadataAggBatch(dir0, ...)
-        MetadataHandlerBatch
-          MetadataAggBatch(dir0, dir1, ...)
-            MetadataHandlerBatch
-              MetadataAggBatch(dir0, dir1, fqn, ...)
-                Scan(DYNAMIC_STAR **, ANY fqn, ...)
+    MetadataHandlerRel
+      MetadataAggRel(dir0, ...)
+        MetadataHandlerRel
+          MetadataAggRel(dir0, dir1, ...)
+            MetadataHandlerRel
+              MetadataAggRel(dir0, dir1, fqn, ...)
+                DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
 ```
 
-The lowest `MetadataAggBatch` creates required aggregate calls for every (or interesting only) table columns
+For the case when `ANALYZE` uses columns for which statistics is present in parquet metadata,
+`ConvertMetadataAggregateToDirectScanRule` rule will be applied to the 
+
+```
+MetadataAggRel(dir0, dir1, fqn, ...)
+  DrillScanRel(DYNAMIC_STAR **, ANY fqn, ...)
+```
+
+plan part and convert it to the `DrillDirectScanRel` populated with row group metadata for the case when `ANALYZE`
+was done for `ROW_GROUP` metadata level.
+For the case when metadata level in `ANALYZE` is not `ROW_GROUP`, the plan above will be converted into the following plan:
+
+```
+MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false)
+  DrillDirectScanRel
+```
+
+When it is converted into the physical plan, two-phase aggregation may be used for the case when incoming row
+count is greater than `planner.slice_target` option value. In this case, the lowest aggregation will be hash
+aggregation and it will be executed on the same minor fragments where the scan is produced. `Sort` operator will be
+placed above hash aggregation. `HashToMergeExchange` operator above `Sort` will send aggregated sorted data to the
+stream aggregate above.
+
+Example of the resulting plan:
+
+```
+MetadataControllerPrel
+  ...
+    MetadataStreamAggPrel(PHASE_1of1)
+      SortPrel
+        MetadataHandlerPrel
+          MetadataStreamAggPrel(PHASE_2of2)
+            HashToMergeExchangePrel
+              SortPrel
+                MetadataHashAggPrel(PHASE_1of2)
+                  ScanPrel
+```
+
+The lowest `MetadataStreamAggBatch` (or `MetadataHashAggBatch` for the case of two-phase aggregation with
+`MetadataStreamAggBatch` above) creates required aggregate calls for every (or interesting only) table columns
 and produces aggregations with grouping by segment columns that correspond to specific table level.
 `MetadataHandlerBatch` above it populates batch with additional information about metadata type and other info.
-`MetadataAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.
+`MetadataStreamAggBatch` above merges metadata calculated before to obtain metadata for parent metadata levels and also stores incoming data to populate it to the Metastore later.
 
 `MetadataControllerBatch` obtains all calculated metadata, converts it to the suitable form and sends it to the Metastore.
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
index 21e012e..aa0b9fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
@@ -71,7 +71,7 @@ public class IsPredicate<C extends Comparable<C>> extends LogicalExpressionBase
    * @param stat statistics object
    * @return <tt>true</tt> if the input stat object is null or has invalid statistics; false otherwise
    */
-  static boolean isNullOrEmpty(ColumnStatistics stat) {
+  public static boolean isNullOrEmpty(ColumnStatistics stat) {
     return stat == null
         || !stat.contains(ColumnStatisticsKind.MIN_VALUE)
         || !stat.contains(ColumnStatisticsKind.MAX_VALUE)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
index aac0abb..6203cd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
@@ -213,9 +213,7 @@ class DrillAggFuncHolder extends DrillFuncHolder {
         declareVarArgArray(g.getModel(), sub, inputVariables);
       }
       for (int i = 0; i < inputVariables.length; i++) {
-        ValueReference parameter = getAttributeParameter(i);
-        HoldingContainer inputVariable = inputVariables[i];
-        declare(sub, parameter, inputVariable.getHolder().type(), inputVariable.getHolder(), i);
+        declare(g.getModel(), sub, inputVariables[i], i);
       }
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
index 8439983..eec5848 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterAggFuncHolder.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate;
 import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
@@ -56,10 +58,18 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
 
   @Override
   public JVar[] renderStart(ClassGenerator<?> classGenerator, HoldingContainer[] inputVariables, FieldReference fieldReference) {
-    if (!classGenerator.getMappingSet().isHashAggMapping()) {  //Declare workspace vars for non-hash-aggregation.
-      JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+    JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
 
-      complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+    complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+
+    if (classGenerator.getMappingSet().isHashAggMapping()) {
+      // Default name is "col", if not passed in a reference name for the output vector.
+      String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
+      JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
+      classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+
+      return super.renderStart(classGenerator, inputVariables, fieldReference);
+    } else {  //Declare workspace vars for non-hash-aggregation.
       writerIdx = classGenerator.declareClassField("writerIdx", classGenerator.getModel()._ref(int.class));
       lastWriterIdx = classGenerator.declareClassField("lastWriterIdx", classGenerator.getModel()._ref(int.class));
       //Default name is "col", if not passed in a reference name for the output vector.
@@ -72,8 +82,6 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
       JVar[] workspaceJVars = declareWorkspaceVariables(classGenerator);
       generateBody(classGenerator, ClassGenerator.BlockType.SETUP, setup(), null, workspaceJVars, true);
       return workspaceJVars;
-    } else {
-      return super.renderStart(classGenerator, inputVariables, fieldReference);
     }
   }
 
@@ -84,26 +92,33 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
         getRegisteredNames()[0]));
 
     JBlock sub = new JBlock(true, true);
-    JBlock topSub = sub;
     JClass aggBatchClass = null;
 
     if (classGenerator.getCodeGenerator().getDefinition() == StreamingAggTemplate.TEMPLATE_DEFINITION) {
       aggBatchClass = classGenerator.getModel().ref(StreamingAggBatch.class);
+    } else if (classGenerator.getCodeGenerator().getDefinition() == HashAggTemplate.TEMPLATE_DEFINITION) {
+      aggBatchClass = classGenerator.getModel().ref(HashAggBatch.class);
     }
-    assert aggBatchClass != null : "ComplexWriterAggFuncHolder should only be used with Streaming Aggregate Operator";
 
     JExpression aggBatch = JExpr.cast(aggBatchClass, classGenerator.getMappingSet().getOutgoing());
 
     classGenerator.getSetupBlock().add(aggBatch.invoke("addComplexWriter").arg(complexWriter));
     // Only set the writer if there is a position change. Calling setPosition may cause underlying writers to allocate
     // new vectors, thereby, losing the previously stored values
-    JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
-    condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
-    condAssignCW.assign(lastWriterIdx, writerIdx);
+    if (classGenerator.getMappingSet().isHashAggMapping()) {
+      classGenerator.getEvalBlock().add(
+          complexWriter
+              .invoke("setPosition")
+              .arg(classGenerator.getMappingSet().getWorkspaceIndex()));
+    } else {
+      JBlock condAssignCW = classGenerator.getEvalBlock()._if(lastWriterIdx.ne(writerIdx))._then();
+      condAssignCW.add(complexWriter.invoke("setPosition").arg(writerIdx));
+      condAssignCW.assign(lastWriterIdx, writerIdx);
+    }
     sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
 
     // add the subblock after the out declaration.
-    classGenerator.getEvalBlock().add(topSub);
+    classGenerator.getEvalBlock().add(sub);
 
     addProtectedBlock(classGenerator, sub, add(), inputVariables, workspaceJVars, false);
     classGenerator.getEvalBlock().directStatement(String.format("//---- end of eval portion of %s function. ----//",
@@ -124,7 +139,8 @@ public class DrillComplexWriterAggFuncHolder extends DrillAggFuncHolder {
           JExpr._new(classGenerator.getHolderType(getReturnType())));
     }
     classGenerator.getEvalBlock().add(sub);
-    if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE) {
+    if (getReturnType().getMinorType() == TypeProtos.MinorType.LATE
+        && !classGenerator.getMappingSet().isHashAggMapping()) {
       sub.assignPlus(writerIdx, JExpr.lit(1));
     }
     addProtectedBlock(classGenerator, sub, output(), null, workspaceJVars, false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
index 4d758f8..91f7cea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java
@@ -218,43 +218,21 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
         if (decConstInputOnly && !inputVariables[i].isConstant()) {
           continue;
         }
-
-        ValueReference parameter = getAttributeParameter(i);
-        HoldingContainer inputVariable = inputVariables[i];
-        if (parameter.isFieldReader() && ! inputVariable.isReader()
-            && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) {
-          JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
-              inputVariable.getMajorType().getMode()));
-          JType fieldReadClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
-          JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
-          declare(sub, parameter, fieldReadClass, reader, i);
-        } else if (!parameter.isFieldReader() && inputVariable.isReader() && Types.isComplex(parameter.getType())) {
-          // For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
-          // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
-          // from FieldReader to respective Holder.
-          if (Types.isComplex(parameter.getType())) {
-            JType holderClass = getParamClass(g.getModel(), parameter, inputVariable.getHolder().type());
-            JAssignmentTarget holderVar = declare(sub, parameter, holderClass, JExpr._new(holderClass), i);
-            sub.assign(holderVar.ref("reader"), inputVariable.getHolder());
-          }
-        } else {
-          JExpression exprToAssign = inputVariable.getHolder();
-          if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
-            exprToAssign = exprToAssign.ref("reader");
-          }
-          declare(sub, parameter, inputVariable.getHolder().type(), exprToAssign, i);
-        }
+        declare(g.getModel(), sub, inputVariables[i], i);
       }
     }
 
     JVar[] internalVars = new JVar[workspaceJVars.length];
     for (int i = 0; i < workspaceJVars.length; i++) {
       if (decConstInputOnly) {
-        internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
+        internalVars[i] = sub.decl(
+            g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
+            attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
       } else {
-        internalVars[i] = sub.decl(g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()), attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
+        internalVars[i] = sub.decl(
+            g.getModel()._ref(attributes.getWorkspaceVars()[i].getType()),
+            attributes.getWorkspaceVars()[i].getName(), workspaceJVars[i]);
       }
-
     }
 
     Preconditions.checkNotNull(body);
@@ -267,6 +245,48 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder {
   }
 
   /**
+   * Declares attribute parameter which corresponds to specified {@code currentIndex}
+   * in specified {@code jBlock} considering its type.
+   *
+   * @param model         code model to generate the code
+   * @param jBlock        block of code to be populated
+   * @param inputVariable input variable for current function
+   * @param currentIndex  index of current parameter
+   */
+  protected void declare(JCodeModel model, JBlock jBlock,
+      HoldingContainer inputVariable, int currentIndex) {
+    ValueReference parameter = getAttributeParameter(currentIndex);
+    if (parameter.isFieldReader()
+        && !inputVariable.isReader()
+        && !Types.isComplex(inputVariable.getMajorType())
+        && inputVariable.getMinorType() != MinorType.UNION) {
+      JType singularReaderClass = model._ref(
+          TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(),
+          inputVariable.getMajorType().getMode()));
+      JType fieldReadClass = getParamClass(model, parameter, inputVariable.getHolder().type());
+      JInvocation reader = JExpr._new(singularReaderClass).arg(inputVariable.getHolder());
+      declare(jBlock, parameter, fieldReadClass, reader, currentIndex);
+    } else if (!parameter.isFieldReader()
+        && inputVariable.isReader()
+        && Types.isComplex(parameter.getType())) {
+      // For complex data-types (repeated maps/lists/dicts) the input to the aggregate will be a FieldReader. However, aggregate
+      // functions like ANY_VALUE, will assume the input to be a RepeatedMapHolder etc. Generate boilerplate code, to map
+      // from FieldReader to respective Holder.
+      if (Types.isComplex(parameter.getType())) {
+        JType holderClass = getParamClass(model, parameter, inputVariable.getHolder().type());
+        JAssignmentTarget holderVar = declare(jBlock, parameter, holderClass, JExpr._new(holderClass), currentIndex);
+        jBlock.assign(holderVar.ref("reader"), inputVariable.getHolder());
+      }
+    } else {
+      JExpression exprToAssign = inputVariable.getHolder();
+      if (parameter.isVarArg() && parameter.isFieldReader() && Types.isUnion(inputVariable.getMajorType())) {
+        exprToAssign = exprToAssign.ref("reader");
+      }
+      declare(jBlock, parameter, inputVariable.getHolder().type(), exprToAssign, currentIndex);
+    }
+  }
+
+  /**
    * Declares array for storing vararg function arguments.
    *
    * @param model          code model to generate the code
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java
new file mode 100644
index 0000000..0b9faca
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/ColumnNamesOptions.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.metastore;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import java.util.StringJoiner;
+
+/**
+ * Holds system / session options that are used for obtaining partition / implicit / special column names.
+ */
+public class ColumnNamesOptions {
+  private final String fullyQualifiedName;
+  private final String partitionColumnNameLabel;
+  private final String rowGroupIndex;
+  private final String rowGroupStart;
+  private final String rowGroupLength;
+  private final String lastModifiedTime;
+
+  public ColumnNamesOptions(OptionManager optionManager) {
+    this.fullyQualifiedName = optionManager.getOption(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL).string_val;
+    this.partitionColumnNameLabel = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+    this.rowGroupIndex = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL).string_val;
+    this.rowGroupStart = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL).string_val;
+    this.rowGroupLength = optionManager.getOption(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL).string_val;
+    this.lastModifiedTime = optionManager.getOption(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL).string_val;
+  }
+
+  public String partitionColumnNameLabel() {
+    return partitionColumnNameLabel;
+  }
+
+  public String fullyQualifiedName() {
+    return fullyQualifiedName;
+  }
+
+  public String rowGroupIndex() {
+    return rowGroupIndex;
+  }
+
+  public String rowGroupStart() {
+    return rowGroupStart;
+  }
+
+  public String rowGroupLength() {
+    return rowGroupLength;
+  }
+
+  public String lastModifiedTime() {
+    return lastModifiedTime;
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", ColumnNamesOptions.class.getSimpleName() + "[", "]")
+        .add("fullyQualifiedName='" + fullyQualifiedName + "'")
+        .add("partitionColumnNameLabel='" + partitionColumnNameLabel + "'")
+        .add("rowGroupIndex='" + rowGroupIndex + "'")
+        .add("rowGroupStart='" + rowGroupStart + "'")
+        .add("rowGroupLength='" + rowGroupLength + "'")
+        .add("lastModifiedTime='" + lastModifiedTime + "'")
+        .toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
index 0371cc2..a4bf0ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeFileInfoProvider.java
@@ -18,17 +18,14 @@
 package org.apache.drill.exec.metastore.analyze;
 
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatSelection;
@@ -37,7 +34,7 @@ import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.metastore.metadata.TableInfo;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
@@ -49,7 +46,7 @@ import java.util.stream.Collectors;
 public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
 
   @Override
-  public List<SchemaPath> getSegmentColumns(DrillTable table, OptionManager options) throws IOException {
+  public List<SchemaPath> getSegmentColumns(DrillTable table, ColumnNamesOptions columnNamesOptions) throws IOException {
     FormatSelection selection = (FormatSelection) table.getSelection();
 
     FileSelection fileSelection = selection.getSelection();
@@ -57,16 +54,17 @@ public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
       fileSelection = FileMetadataInfoCollector.getExpandedFileSelection(fileSelection);
     }
 
-    return ColumnExplorer.getPartitionColumnNames(fileSelection, options).stream()
+    return ColumnExplorer.getPartitionColumnNames(fileSelection, columnNamesOptions).stream()
         .map(SchemaPath::getSimplePath)
         .collect(Collectors.toList());
   }
 
   @Override
-  public List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options) {
-    return Arrays.asList(
-        new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL), SqlParserPos.ZERO),
-        new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL), SqlParserPos.ZERO));
+  public List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException {
+    List<SchemaPath> projectionList = new ArrayList<>(getSegmentColumns(table, columnNamesOptions));
+    projectionList.add(SchemaPath.getSimplePath(columnNamesOptions.fullyQualifiedName()));
+    projectionList.add(SchemaPath.getSimplePath(columnNamesOptions.lastModifiedTime()));
+    return Collections.unmodifiableList(projectionList);
   }
 
   @Override
@@ -78,8 +76,8 @@ public abstract class AnalyzeFileInfoProvider implements AnalyzeInfoProvider {
   }
 
   @Override
-  public SchemaPath getLocationField(OptionManager optionManager) {
-    return SchemaPath.getSimplePath(optionManager.getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL));
+  public SchemaPath getLocationField(ColumnNamesOptions columnNamesOptions) {
+    return SchemaPath.getSimplePath(columnNamesOptions.fullyQualifiedName());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
index 49b8430..88965c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeInfoProvider.java
@@ -18,13 +18,12 @@
 package org.apache.drill.exec.metastore.analyze;
 
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.metastore.components.tables.BasicTablesRequests;
 import org.apache.drill.metastore.metadata.MetadataType;
@@ -42,20 +41,21 @@ public interface AnalyzeInfoProvider {
   /**
    * Returns list of segment column names for specified {@link DrillTable} table.
    *
-   * @param table   table for which should be returned segment column names
-   * @param options option manager
+   * @param table              table for which should be returned segment column names
+   * @param columnNamesOptions column names option values
    * @return list of segment column names
    */
-  List<SchemaPath> getSegmentColumns(DrillTable table, OptionManager options) throws IOException;
+  List<SchemaPath> getSegmentColumns(DrillTable table, ColumnNamesOptions columnNamesOptions) throws IOException;
 
   /**
    * Returns list of fields required for ANALYZE.
    *
-   * @param metadataLevel metadata level for analyze
-   * @param options       option manager
+   * @param table              drill table
+   * @param metadataLevel      metadata level for analyze
+   * @param columnNamesOptions column names option values
    * @return list of fields required for ANALYZE
    */
-  List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options);
+  List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException;
 
   /**
    * Returns {@link MetadataInfoCollector} instance for obtaining information about segments, files, etc.
@@ -79,10 +79,10 @@ public interface AnalyzeInfoProvider {
    * Provides schema path to field which will be used as a location for specific table data,
    * for example, for file-based tables, it may be `fqn`.
    *
-   * @param optionManager option manager
+   * @param columnNamesOptions column names option values
    * @return location field
    */
-  SchemaPath getLocationField(OptionManager optionManager);
+  SchemaPath getLocationField(ColumnNamesOptions columnNamesOptions);
 
   /**
    * Returns expression which may be used to determine parent location for specific table data,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
index 4ce1424..f48e9cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/AnalyzeParquetInfoProvider.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.metastore.analyze;
 
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
 import org.apache.drill.metastore.metadata.MetadataType;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -38,12 +38,12 @@ public class AnalyzeParquetInfoProvider extends AnalyzeFileInfoProvider {
   public static final String TABLE_TYPE_NAME = "PARQUET";
 
   @Override
-  public List<SqlIdentifier> getProjectionFields(MetadataType metadataLevel, OptionManager options) {
-    List<SqlIdentifier> columnList = new ArrayList<>(super.getProjectionFields(metadataLevel, options));
+  public List<SchemaPath> getProjectionFields(DrillTable table, MetadataType metadataLevel, ColumnNamesOptions columnNamesOptions) throws IOException {
+    List<SchemaPath> columnList = new ArrayList<>(super.getProjectionFields(table, metadataLevel, columnNamesOptions));
     if (metadataLevel.includes(MetadataType.ROW_GROUP)) {
-      columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL), SqlParserPos.ZERO));
-      columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL), SqlParserPos.ZERO));
-      columnList.add(new SqlIdentifier(options.getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL), SqlParserPos.ZERO));
+      columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupIndex()));
+      columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupStart()));
+      columnList.add(SchemaPath.getSimplePath(columnNamesOptions.rowGroupLength()));
     }
     return Collections.unmodifiableList(columnList);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
index e9882d1..d9765d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
@@ -154,7 +154,8 @@ public class FileMetadataInfoCollector implements MetadataInfoCollector {
     String selectionRoot = selection.getSelection().getSelectionRoot().toUri().getPath();
 
     if (!Objects.equals(metastoreInterestingColumns, interestingColumns)
-        && (metastoreInterestingColumns == null || !metastoreInterestingColumns.containsAll(interestingColumns))
+        && metastoreInterestingColumns != null &&
+        (interestingColumns == null || !metastoreInterestingColumns.containsAll(interestingColumns))
         || TableStatisticsKind.ANALYZE_METADATA_LEVEL.getValue(basicRequests.tableMetadata(tableInfo)).compareTo(metadataLevel) != 0) {
       // do not update table scan and lists of segments / files / row groups,
       // metadata should be recalculated
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
index 46f6df5..9108345 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataAggregateContext.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
 
 import java.util.List;
 import java.util.Objects;
@@ -36,12 +37,14 @@ public class MetadataAggregateContext {
   private final List<SchemaPath> interestingColumns;
   private final List<SchemaPath> excludedColumns;
   private final boolean createNewAggregations;
+  private final MetadataType metadataLevel;
 
   public MetadataAggregateContext(MetadataAggregateContextBuilder builder) {
     this.groupByExpressions = builder.groupByExpressions;
     this.interestingColumns = builder.interestingColumns;
     this.createNewAggregations = builder.createNewAggregations;
     this.excludedColumns = builder.excludedColumns;
+    this.metadataLevel = builder.metadataLevel;
   }
 
   @JsonProperty
@@ -64,6 +67,11 @@ public class MetadataAggregateContext {
     return excludedColumns;
   }
 
+  @JsonProperty
+  public MetadataType metadataLevel() {
+    return metadataLevel;
+  }
+
   @Override
   public String toString() {
     return new StringJoiner(",\n", MetadataAggregateContext.class.getSimpleName() + "[", "]")
@@ -78,11 +86,21 @@ public class MetadataAggregateContext {
     return new MetadataAggregateContextBuilder();
   }
 
+  public MetadataAggregateContextBuilder toBuilder() {
+    return new MetadataAggregateContextBuilder()
+        .groupByExpressions(groupByExpressions)
+        .interestingColumns(interestingColumns)
+        .createNewAggregations(createNewAggregations)
+        .excludedColumns(excludedColumns)
+        .metadataLevel(metadataLevel);
+  }
+
   @JsonPOJOBuilder(withPrefix = "")
   public static class MetadataAggregateContextBuilder {
     private List<NamedExpression> groupByExpressions;
     private List<SchemaPath> interestingColumns;
     private Boolean createNewAggregations;
+    private MetadataType metadataLevel;
     private List<SchemaPath> excludedColumns;
 
     public MetadataAggregateContextBuilder groupByExpressions(List<NamedExpression> groupByExpressions) {
@@ -90,6 +108,11 @@ public class MetadataAggregateContext {
       return this;
     }
 
+    public MetadataAggregateContextBuilder metadataLevel(MetadataType metadataLevel) {
+      this.metadataLevel = metadataLevel;
+      return this;
+    }
+
     public MetadataAggregateContextBuilder interestingColumns(List<SchemaPath> interestingColumns) {
       this.interestingColumns = interestingColumns;
       return this;
@@ -109,6 +132,7 @@ public class MetadataAggregateContext {
       Objects.requireNonNull(groupByExpressions, "groupByExpressions were not set");
       Objects.requireNonNull(createNewAggregations, "createNewAggregations was not set");
       Objects.requireNonNull(excludedColumns, "excludedColumns were not set");
+      Objects.requireNonNull(metadataLevel, "metadataLevel was not set");
       return new MetadataAggregateContext(this);
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index de13ee5..2371c6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -85,9 +85,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  * Represents table group scan with metadata usage.
  */
-public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupScan {
+public abstract class AbstractGroupScanWithMetadata<P extends TableMetadataProvider> extends AbstractFileGroupScan {
 
-  protected TableMetadataProvider metadataProvider;
+  protected P metadataProvider;
 
   // table metadata info
   protected TableMetadata tableMetadata;
@@ -118,7 +118,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     this.filter = filter;
   }
 
-  protected AbstractGroupScanWithMetadata(AbstractGroupScanWithMetadata that) {
+  protected AbstractGroupScanWithMetadata(AbstractGroupScanWithMetadata<P> that) {
     super(that.getUserName());
     this.columns = that.columns;
     this.filter = that.filter;
@@ -215,7 +215,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
   }
 
   @Override
-  public TableMetadataProvider getMetadataProvider() {
+  public P getMetadataProvider() {
     return metadataProvider;
   }
 
@@ -516,7 +516,13 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
   // partition pruning methods start
   @Override
   public List<SchemaPath> getPartitionColumns() {
-    return partitionColumns != null ? partitionColumns : new ArrayList<>();
+    if (partitionColumns == null) {
+      partitionColumns = metadataProvider.getPartitionColumns();
+      if (partitionColumns == null) {
+        partitionColumns = new ArrayList<>();
+      }
+    }
+    return partitionColumns;
   }
 
   @JsonIgnore
@@ -567,8 +573,8 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     return ColumnExplorer.isPartitionColumn(optionManager, schemaPath) || implicitColNames.contains(schemaPath.getRootSegmentPath());
   }
 
-  // protected methods for internal usage
-  protected Map<Path, FileMetadata> getFilesMetadata() {
+  @JsonIgnore
+  public Map<Path, FileMetadata> getFilesMetadata() {
     if (files == null) {
       files = metadataProvider.getFilesMetadataMap();
     }
@@ -583,14 +589,16 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     return tableMetadata;
   }
 
-  protected List<PartitionMetadata> getPartitionsMetadata() {
+  @JsonIgnore
+  public List<PartitionMetadata> getPartitionsMetadata() {
     if (partitions == null) {
       partitions = metadataProvider.getPartitionsMetadata();
     }
     return partitions;
   }
 
-  protected Map<Path, SegmentMetadata> getSegmentsMetadata() {
+  @JsonIgnore
+  public Map<Path, SegmentMetadata> getSegmentsMetadata() {
     if (segments == null) {
       segments = metadataProvider.getSegmentsMetadataMap();
     }
@@ -614,7 +622,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
    * This class is responsible for filtering different metadata levels.
    */
   protected abstract static class GroupScanWithMetadataFilterer<B extends GroupScanWithMetadataFilterer<B>> {
-    protected final AbstractGroupScanWithMetadata source;
+    protected final AbstractGroupScanWithMetadata<? extends TableMetadataProvider> source;
 
     protected boolean matchAllMetadata = false;
 
@@ -952,7 +960,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
         Iterable<T> metadataList,
         FilterPredicate<?> filterPredicate,
         OptionManager optionManager) {
-      List<T> qualifiedFiles = new ArrayList<>();
+      List<T> qualifiedMetadata = new ArrayList<>();
 
       for (T metadata : metadataList) {
         TupleMetadata schema = metadata.getSchema();
@@ -983,12 +991,12 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
         if (matchAllMetadata) {
           matchAllMetadata = match == RowsMatch.ALL;
         }
-        qualifiedFiles.add(metadata);
+        qualifiedMetadata.add(metadata);
       }
-      if (qualifiedFiles.isEmpty()) {
+      if (qualifiedMetadata.isEmpty()) {
         matchAllMetadata = false;
       }
-      return qualifiedFiles;
+      return qualifiedMetadata;
     }
 
     protected abstract B self();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
index 592f7c3..0828bea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -32,8 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("hash-to-merge-exchange")
-public class HashToMergeExchange extends AbstractExchange{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class);
+public class HashToMergeExchange extends AbstractExchange {
 
   private final LogicalExpression distExpr;
   private final List<Ordering> orderExprs;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
similarity index 62%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
index f31735c..35990c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataHashAggPOP.java
@@ -20,29 +20,41 @@ package org.apache.drill.exec.physical.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.Collections;
 
-@JsonTypeName("metadataAggregate")
-public class MetadataAggPOP extends StreamingAggregate {
+@JsonTypeName("metadataHashAggregate")
+public class MetadataHashAggPOP extends HashAggregate {
   private final MetadataAggregateContext context;
+  private final OperatorPhase phase;
 
   @JsonCreator
-  public MetadataAggPOP(@JsonProperty("child") PhysicalOperator child,
-      @JsonProperty("context") MetadataAggregateContext context) {
-    super(child, context.groupByExpressions(), Collections.emptyList());
+  public MetadataHashAggPOP(@JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("context") MetadataAggregateContext context,
+      @JsonProperty("phase") OperatorPhase phase) {
+    super(child, phase, context.groupByExpressions(), Collections.emptyList(), 1.0F);
+    Preconditions.checkArgument(context.createNewAggregations(),
+        "Hash aggregate for metadata collecting should be used only for creating new aggregations.");
     this.context = context;
+    this.phase = phase;
   }
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new MetadataAggPOP(child, context);
+    return new MetadataHashAggPOP(child, context, phase);
   }
 
   @JsonProperty
   public MetadataAggregateContext getContext() {
     return context;
   }
+
+  @JsonProperty
+  public OperatorPhase getPhase() {
+    return phase;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
similarity index 72%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
index f31735c..e7220c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataAggPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MetadataStreamAggPOP.java
@@ -22,27 +22,36 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 
 import java.util.Collections;
 
-@JsonTypeName("metadataAggregate")
-public class MetadataAggPOP extends StreamingAggregate {
+@JsonTypeName("metadataStreamAggregate")
+public class MetadataStreamAggPOP extends StreamingAggregate {
   private final MetadataAggregateContext context;
+  private final OperatorPhase phase;
 
   @JsonCreator
-  public MetadataAggPOP(@JsonProperty("child") PhysicalOperator child,
-      @JsonProperty("context") MetadataAggregateContext context) {
+  public MetadataStreamAggPOP(@JsonProperty("child") PhysicalOperator child,
+      @JsonProperty("context") MetadataAggregateContext context,
+      @JsonProperty("phase") OperatorPhase phase) {
     super(child, context.groupByExpressions(), Collections.emptyList());
     this.context = context;
+    this.phase = phase;
   }
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new MetadataAggPOP(child, context);
+    return new MetadataStreamAggPOP(child, context, phase);
   }
 
   @JsonProperty
   public MetadataAggregateContext getContext() {
     return context;
   }
+
+  @JsonProperty
+  public OperatorPhase getPhase() {
+    return phase;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index f905687..45c670b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,11 +18,20 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -69,20 +78,25 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
+  static final Logger logger = LoggerFactory.getLogger(HashAggBatch.class);
 
   private HashAggregator aggregator;
-  private RecordBatch incoming;
+  protected RecordBatch incoming;
   private LogicalExpression[] aggrExprs;
   private TypedFieldId[] groupByOutFieldIds;
   private TypedFieldId[] aggrOutFieldIds;      // field ids for the outgoing batch
   private final List<Comparator> comparators;
   private BatchSchema incomingSchema;
   private boolean wasKilled;
+  private List<BaseWriter.ComplexWriter> complexWriters;
 
-  private int numGroupByExprs, numAggrExprs;
+  private int numGroupByExprs;
+  private int numAggrExprs;
+  private boolean firstBatch = true;
 
   // This map saves the mapping between outgoing column and incoming column.
   private Map<String, String> columnMapping;
@@ -136,13 +150,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
             valuesRowWidth += ((FixedWidthVector) w.getValueVector()).getValueWidth();
           }
         } else {
-          int columnWidth;
+          int columnWidth = 0;
+          TypeProtos.MajorType type = w.getField().getType();
           if (columnMapping.get(w.getValueVector().getField().getName()) == null) {
-             columnWidth = TypeHelper.getSize(w.getField().getType());
+            if (!Types.isComplex(type)) {
+              columnWidth = TypeHelper.getSize(type);
+            }
           } else {
-            RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(columnMapping.get(w.getValueVector().getField().getName()));
+            RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer()
+                .getColumn(columnMapping.get(w.getValueVector().getField().getName()));
             if (columnSize == null) {
-              columnWidth = TypeHelper.getSize(w.getField().getType());
+              columnWidth = TypeHelper.getSize(type);
             } else {
               columnWidth = columnSize.getAllocSizePerEntry();
             }
@@ -214,6 +232,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
+  public VectorContainer getOutgoingContainer() {
+    return container;
+  }
+
+  @Override
   public int getRecordCount() {
     if (state == BatchState.DONE) {
       return 0;
@@ -222,7 +245,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
+  public void buildSchema() {
     IterOutcome outcome = next(incoming);
     switch (outcome) {
       case NONE:
@@ -305,7 +328,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       state = BatchState.DONE;
       // fall through
     case RETURN_OUTCOME:
-      return aggregator.getOutcome();
+      // rebuilds the schema in the case of complex writer expressions,
+      // since vectors would be added to batch run-time
+      IterOutcome outcome = aggregator.getOutcome();
+      switch (outcome) {
+        case OK:
+        case OK_NEW_SCHEMA:
+          if (firstBatch) {
+            if (CollectionUtils.isNotEmpty(complexWriters)) {
+              container.buildSchema(SelectionVectorMode.NONE);
+              outcome = IterOutcome.OK_NEW_SCHEMA;
+            }
+            firstBatch = false;
+          }
+          // fall thru
+        default:
+          return outcome;
+      }
 
     case UPDATE_AGGREGATOR:
       context.getExecutorState().fail(UserException.unsupportedError()
@@ -316,6 +355,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
           .build(logger));
       close();
       killIncoming(false);
+      firstBatch = false;
       return IterOutcome.STOP;
     default:
       throw new IllegalStateException(String.format("Unknown state %s.", out));
@@ -345,7 +385,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     }
   }
 
-  private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException,
+  @SuppressWarnings("unused") // used in generated code
+  public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
+    complexWriters.add(writer);
+  }
+
+  protected HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException,
       IOException {
     CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getOptions());
     ClassGenerator<HashAggregator> cg = top.getRoot();
@@ -355,8 +400,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     // top.saveCodeForDebugging(true);
     container.clear();
 
-    numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
-    numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().size() : 0;
+    numGroupByExprs = (getKeyExpressions() != null) ? getKeyExpressions().size() : 0;
+    numAggrExprs = (getValueExpressions() != null) ? getValueExpressions().size() : 0;
     aggrExprs = new LogicalExpression[numAggrExprs];
     groupByOutFieldIds = new TypedFieldId[numGroupByExprs];
     aggrOutFieldIds = new TypedFieldId[numAggrExprs];
@@ -364,7 +409,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     ErrorCollector collector = new ErrorCollectorImpl();
 
     for (int i = 0; i < numGroupByExprs; i++) {
-      NamedExpression ne = popConfig.getGroupByExprs().get(i);
+      NamedExpression ne = getKeyExpressions().get(i);
       final LogicalExpression expr =
           ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
       if (expr == null) {
@@ -381,7 +426,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
     int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
     for (int i = 0; i < numAggrExprs; i++) {
-      NamedExpression ne = popConfig.getAggrExprs().get(i);
+      NamedExpression ne = getValueExpressions().get(i);
       final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
 
       if (expr instanceof IfExpression) {
@@ -396,30 +441,45 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         continue;
       }
 
-      final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
-      ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
-      aggrOutFieldIds[i] = container.add(vv);
+      // Populate the complex writers for complex exprs
+      if (expr instanceof DrillFuncHolderExpr &&
+          ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
+        if (complexWriters == null) {
+          complexWriters = new ArrayList<>();
+        } else {
+          complexWriters.clear();
+        }
+        // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+        ((DrillFuncHolderExpr) expr).setFieldReference(ne.getRef());
+        MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE);
+        container.add(new UntypedNullVector(field, container.getAllocator()));
+        aggrExprs[i] = expr;
+      } else {
+        MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
+        ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+        aggrOutFieldIds[i] = container.add(vv);
 
-      aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
+        aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
 
-      if (expr instanceof FunctionHolderExpression) {
-        String funcName = ((FunctionHolderExpression) expr).getName();
-        if (funcName.equals("sum") || funcName.equals("max") || funcName.equals("min")) {
-          extraNonNullColumns++;
-        }
-        List<LogicalExpression> args = ((FunctionCall) ne.getExpr()).args;
-        if (!args.isEmpty()) {
-          if (args.get(0) instanceof SchemaPath) {
-            columnMapping.put(outputField.getName(), ((SchemaPath) args.get(0)).getAsNamePart().getName());
-          } else if (args.get(0) instanceof FunctionCall) {
-            FunctionCall functionCall = (FunctionCall) args.get(0);
-            if (functionCall.args.get(0) instanceof SchemaPath) {
-              columnMapping.put(outputField.getName(), ((SchemaPath) functionCall.args.get(0)).getAsNamePart().getName());
+        if (expr instanceof FunctionHolderExpression) {
+          String funcName = ((FunctionHolderExpression) expr).getName();
+          if (funcName.equals("sum") || funcName.equals("max") || funcName.equals("min")) {
+            extraNonNullColumns++;
+          }
+          List<LogicalExpression> args = ((FunctionCall) ne.getExpr()).args;
+          if (!args.isEmpty()) {
+            if (args.get(0) instanceof SchemaPath) {
+              columnMapping.put(outputField.getName(), ((SchemaPath) args.get(0)).getAsNamePart().getName());
+            } else if (args.get(0) instanceof FunctionCall) {
+              FunctionCall functionCall = (FunctionCall) args.get(0);
+              if (functionCall.args.get(0) instanceof SchemaPath) {
+                columnMapping.put(outputField.getName(), ((SchemaPath) functionCall.args.get(0)).getAsNamePart().getName());
+              }
             }
           }
+        } else {
+          columnMapping.put(outputField.getName(), ne.getRef().getAsNamePart().getName());
         }
-      } else {
-        columnMapping.put(outputField.getName(), ne.getRef().getAsNamePart().getName());
       }
     }
 
@@ -433,7 +493,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     HashTableConfig htConfig =
         // TODO - fix the validator on this option
         new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
-            HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
+            HashTable.DEFAULT_LOAD_FACTOR, getKeyExpressions(), null /* no probe exprs */, comparators);
 
     agg.setup(popConfig, htConfig, context, oContext, incoming, this,
         aggrExprs,
@@ -445,6 +505,14 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     return agg;
   }
 
+  protected List<NamedExpression> getKeyExpressions() {
+    return popConfig.getGroupByExprs();
+  }
+
+  protected List<NamedExpression> getValueExpressions() {
+    return popConfig.getAggrExprs();
+  }
+
   private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) {
     cg.setMappingSet(UpdateAggrValuesMapping);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index d166353..7fcfd99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -34,6 +34,8 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -779,12 +781,19 @@ public abstract class HashAggTemplate implements HashAggregator {
     while (outgoingIter.hasNext()) {
       ValueVector vv = outgoingIter.next().getValueVector();
 
-      AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
+      // Prevent allocating complex vectors here to avoid losing their content
+      // since their writers will still be used in generated code
+      TypeProtos.MajorType majorType = vv.getField().getType();
+      if (!Types.isComplex(majorType)
+          && !Types.isUnion(majorType)
+          && !Types.isRepeated(majorType)) {
+        AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
+      }
     }
 
     long memAdded = allocator.getAllocatedMemory() - allocatedBefore;
-    if ( memAdded > estOutgoingAllocSize ) {
-      logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...",memAdded,estOutgoingAllocSize);
+    if (memAdded > estOutgoingAllocSize) {
+      logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...", memAdded, estOutgoingAllocSize);
       estOutgoingAllocSize = memAdded;
     }
     outContainer.setRecordCount(records);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 8fc7118..c3b504a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -45,7 +45,6 @@ import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.DrillFuncHolderExpr;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.HoldingContainerExpression;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -476,8 +475,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       keyExprs[i] = expr;
       MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
                                                                       expr.getMajorType());
-      ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
-      keyOutputIds[i] = container.add(vector);
+      container.addOrGet(outputField);
+      keyOutputIds[i] = container.getValueVectorId(ne.getRef());
     }
 
     for (int i = 0; i < valueExprs.length; i++) {
@@ -501,15 +500,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           complexWriters.clear();
         }
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
-        ((DrillFuncHolderExpr) expr).getFieldReference(ne.getRef());
+        ((DrillFuncHolderExpr) expr).setFieldReference(ne.getRef());
         MaterializedField field = MaterializedField.create(ne.getRef().getAsNamePart().getName(), UntypedNullHolder.TYPE);
         container.add(new UntypedNullVector(field, container.getAllocator()));
         valueExprs[i] = expr;
       } else {
         MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
             expr.getMajorType());
-        ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
-        TypedFieldId id = container.add(vector);
+        container.addOrGet(outputField);
+        TypedFieldId id = container.getValueVectorId(ne.getRef());
         valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index cee7625..34e9d7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -440,7 +440,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
         }
 
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
-        ((DrillFuncHolderExpr) expr).getFieldReference(namedExpression.getRef());
+        ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef());
         cg.addExpr(expr);
       } else {
         // need to do evaluation.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
similarity index 68%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
index 51dfb15..1cca788 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggregateHelper.java
@@ -27,23 +27,17 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
-import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -53,26 +47,32 @@ import java.util.Map;
 import java.util.stream.StreamSupport;
 
 /**
- * Operator which adds aggregate calls for all incoming columns to calculate required metadata and produces aggregations.
- * If aggregation is performed on top of another aggregation, required aggregate calls for merging metadata will be added.
+ * Helper class for constructing aggregate value expressions required for metadata collecting.
  */
-public class MetadataAggBatch extends StreamingAggBatch {
-
-  private List<NamedExpression> valueExpressions;
+public class MetadataAggregateHelper {
+  private final List<NamedExpression> valueExpressions;
+  private final MetadataAggregateContext context;
+  private final ColumnNamesOptions columnNamesOptions;
+  private final BatchSchema schema;
+  private final AggPrelBase.OperatorPhase phase;
 
-  public MetadataAggBatch(MetadataAggPOP popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
-    super(popConfig, incoming, context);
+  public MetadataAggregateHelper(MetadataAggregateContext context, ColumnNamesOptions columnNamesOptions,
+      BatchSchema schema, AggPrelBase.OperatorPhase phase) {
+    this.context = context;
+    this.columnNamesOptions = columnNamesOptions;
+    this.schema = schema;
+    this.phase = phase;
+    this.valueExpressions = new ArrayList<>();
+    createAggregatorInternal();
   }
 
-  @Override
-  protected StreamingAggregator createAggregatorInternal()
-      throws SchemaChangeException, ClassTransformationException, IOException {
-    valueExpressions = new ArrayList<>();
-    MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
+  public List<NamedExpression> getValueExpressions() {
+    return valueExpressions;
+  }
 
-    List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+  private void createAggregatorInternal() {
+    List<SchemaPath> excludedColumns = context.excludedColumns();
 
-    BatchSchema schema = incoming.getSchema();
     // Iterates through input expressions and adds aggregate calls for table fields
     // to collect required statistics (MIN, MAX, COUNT, etc.) or aggregate calls to merge incoming metadata
     getUnflattenedFileds(Lists.newArrayList(schema), null)
@@ -90,19 +90,36 @@ public class MetadataAggBatch extends StreamingAggBatch {
           fieldsList.add(FieldReference.getWithQuotedRef(filedName));
         });
 
-    if (popConfig.getContext().createNewAggregations()) {
+    if (createNewAggregations()) {
       addMetadataAggregateCalls();
       // infer schema from incoming data
       addSchemaCall(fieldsList);
-      addNewMetadataAggregations();
+      // adds any_value(`location`) call for SEGMENT level
+      if (context.metadataLevel() == MetadataType.SEGMENT) {
+        addLocationAggCall(columnNamesOptions.fullyQualifiedName());
+      }
     } else {
-      addCollectListCall(fieldsList);
+      if (!context.createNewAggregations()) {
+        // collects incoming metadata
+        addCollectListCall(fieldsList);
+      }
       addMergeSchemaCall();
+      String locationField = MetastoreAnalyzeConstants.LOCATION_FIELD;
+
+      if (context.createNewAggregations()) {
+        locationField = columnNamesOptions.fullyQualifiedName();
+      }
+
+      if (context.metadataLevel() == MetadataType.SEGMENT) {
+        addParentLocationAggCall();
+      } else {
+        addLocationAggCall(locationField);
+      }
     }
 
     for (SchemaPath excludedColumn : excludedColumns) {
-      if (excludedColumn.equals(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL)))
-          || excludedColumn.equals(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL)))) {
+      if (excludedColumn.equals(SchemaPath.getSimplePath(columnNamesOptions.rowGroupStart()))
+          || excludedColumn.equals(SchemaPath.getSimplePath(columnNamesOptions.rowGroupLength()))) {
         LogicalExpression lastModifiedTime = new FunctionCall("any_value",
             Collections.singletonList(
                 FieldReference.getWithQuotedRef(excludedColumn.getRootSegmentPath())),
@@ -113,20 +130,69 @@ public class MetadataAggBatch extends StreamingAggBatch {
       }
     }
 
-    addMaxLastModifiedCall();
+    addLastModifiedCall();
+  }
+
+  /**
+   * Adds any_value(parentPath(`location`)) aggregate call to the value expressions list.
+   */
+  private void addParentLocationAggCall() {
+    valueExpressions.add(
+        new NamedExpression(
+            new FunctionCall(
+                "any_value",
+                Collections.singletonList(
+                    new FunctionCall(
+                        "parentPath",
+                        Collections.singletonList(SchemaPath.getSimplePath(MetastoreAnalyzeConstants.LOCATION_FIELD)),
+                        ExpressionPosition.UNKNOWN)),
+                ExpressionPosition.UNKNOWN),
+            FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+  }
+
+  /**
+   * Adds any_value(`location`) aggregate call to the value expressions list.
+   *
+   * @param locationField name of the location field
+   */
+  private void addLocationAggCall(String locationField) {
+    valueExpressions.add(
+        new NamedExpression(
+            new FunctionCall(
+                "any_value",
+                Collections.singletonList(SchemaPath.getSimplePath(locationField)),
+                ExpressionPosition.UNKNOWN),
+            FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+  }
 
-    return super.createAggregatorInternal();
+  /**
+   * Checks whether incoming data is not grouped, so corresponding aggregate calls should be created.
+   *
+   * @return {@code true} if incoming data is not grouped, {@code false} otherwise.
+   */
+  private boolean createNewAggregations() {
+    return context.createNewAggregations()
+        && (phase == AggPrelBase.OperatorPhase.PHASE_1of2
+        || phase == AggPrelBase.OperatorPhase.PHASE_1of1);
   }
 
   /**
    * Adds {@code max(`lastModifiedTime`)} function call to the value expressions list.
    */
-  private void addMaxLastModifiedCall() {
-    String lastModifiedColumn = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
-    LogicalExpression lastModifiedTime = new FunctionCall("max",
-        Collections.singletonList(
-            FieldReference.getWithQuotedRef(lastModifiedColumn)),
-        ExpressionPosition.UNKNOWN);
+  private void addLastModifiedCall() {
+    String lastModifiedColumn = columnNamesOptions.lastModifiedTime();
+    LogicalExpression lastModifiedTime;
+    if (createNewAggregations()) {
+      lastModifiedTime = new FunctionCall("any_value",
+          Collections.singletonList(
+              FieldReference.getWithQuotedRef(lastModifiedColumn)),
+          ExpressionPosition.UNKNOWN);
+    } else {
+      lastModifiedTime = new FunctionCall("max",
+          Collections.singletonList(
+              FieldReference.getWithQuotedRef(lastModifiedColumn)),
+          ExpressionPosition.UNKNOWN);
+    }
 
     valueExpressions.add(new NamedExpression(lastModifiedTime,
         FieldReference.getWithQuotedRef(lastModifiedColumn)));
@@ -140,8 +206,7 @@ public class MetadataAggBatch extends StreamingAggBatch {
    */
   private void addCollectListCall(List<LogicalExpression> fieldList) {
     ArrayList<LogicalExpression> collectListArguments = new ArrayList<>(fieldList);
-    MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
-    List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+    List<SchemaPath> excludedColumns = context.excludedColumns();
     // populate columns which weren't included in the schema, but should be collected to the COLLECTED_MAP_FIELD
     for (SchemaPath logicalExpressions : excludedColumns) {
       // adds string literal with field name to the list
@@ -170,17 +235,6 @@ public class MetadataAggBatch extends StreamingAggBatch {
   }
 
   /**
-   * Adds {@code collect_to_list_varchar(`fqn`)} call to collect file paths into the list.
-   */
-  private void addNewMetadataAggregations() {
-    LogicalExpression locationsExpr = new FunctionCall("collect_to_list_varchar",
-        Collections.singletonList(SchemaPath.getSimplePath(context.getOptions().getString(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL))),
-        ExpressionPosition.UNKNOWN);
-
-    valueExpressions.add(new NamedExpression(locationsExpr, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATIONS_FIELD)));
-  }
-
-  /**
    * Adds a call to {@code schema()}} function with specified fields list
    * as arguments of this function to obtain their schema.
    *
@@ -220,8 +274,7 @@ public class MetadataAggBatch extends StreamingAggBatch {
     for (MaterializedField field : fields) {
       // statistics collecting is not supported for array types
       if (field.getType().getMode() != TypeProtos.DataMode.REPEATED) {
-        MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
-        List<SchemaPath> excludedColumns = popConfig.getContext().excludedColumns();
+        List<SchemaPath> excludedColumns = context.excludedColumns();
         // excludedColumns are applied for root fields only
         if (parentFields != null || !excludedColumns.contains(SchemaPath.getSimplePath(field.getName()))) {
           List<String> currentPath;
@@ -231,12 +284,12 @@ public class MetadataAggBatch extends StreamingAggBatch {
             currentPath = new ArrayList<>(parentFields);
             currentPath.add(field.getName());
           }
-          if (field.getType().getMinorType() == TypeProtos.MinorType.MAP && popConfig.getContext().createNewAggregations()) {
+          if (field.getType().getMinorType() == TypeProtos.MinorType.MAP && createNewAggregations()) {
             fieldNameRefMap.putAll(getUnflattenedFileds(field.getChildren(), currentPath));
           } else {
             SchemaPath schemaPath = SchemaPath.getCompoundPath(currentPath.toArray(new String[0]));
             // adds backticks for popConfig.createNewAggregations() to ensure that field will be parsed correctly
-            String name = popConfig.getContext().createNewAggregations() ? schemaPath.toExpr() : schemaPath.getRootSegmentPath();
+            String name = createNewAggregations() ? schemaPath.toExpr() : schemaPath.getRootSegmentPath();
             fieldNameRefMap.put(name, new FieldReference(schemaPath));
           }
         }
@@ -254,9 +307,8 @@ public class MetadataAggBatch extends StreamingAggBatch {
    * @param fieldName field name
    */
   private void addColumnAggregateCalls(FieldReference fieldRef, String fieldName) {
-    MetadataAggPOP popConfig = (MetadataAggPOP) this.popConfig;
-    List<SchemaPath> interestingColumns = popConfig.getContext().interestingColumns();
-    if (popConfig.getContext().createNewAggregations()) {
+    List<SchemaPath> interestingColumns = context.interestingColumns();
+    if (createNewAggregations()) {
       if (interestingColumns == null || interestingColumns.contains(fieldRef)) {
         // collect statistics for all or only interesting columns if they are specified
         AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.forEach((statisticsKind, sqlKind) -> {
@@ -280,9 +332,4 @@ public class MetadataAggBatch extends StreamingAggBatch {
       valueExpressions.add(new NamedExpression(functionCall, fieldRef));
     }
   }
-
-  @Override
-  protected List<NamedExpression> getValueExpressions() {
-    return valueExpressions;
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 0c94e3d..9ccae49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -21,8 +21,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
 import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -105,6 +105,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   private final Map<String, MetadataInfo> metadataToHandle;
   private final StatisticsRecordCollector statisticsCollector;
   private final List<TableMetadataUnit> metadataUnits;
+  private final ColumnNamesOptions columnNamesOptions;
 
   private boolean firstLeft = true;
   private boolean firstRight = true;
@@ -123,6 +124,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
             .collect(Collectors.toMap(MetadataInfo::identifier, Function.identity()));
     this.metadataUnits = new ArrayList<>();
     this.statisticsCollector = new StatisticsCollectorImpl();
+    this.columnNamesOptions = new ColumnNamesOptions(context.getOptions());
   }
 
   protected boolean setupNewSchema() {
@@ -132,6 +134,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
     container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null);
 
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setEmpty();
 
     return true;
   }
@@ -428,7 +431,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   private PartitionMetadata getPartitionMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
-    String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
 
     String segmentKey = segmentColumns.size() > 0
         ? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -450,7 +452,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         .columnsStatistics(columnStatistics)
         .metadataStatistics(metadataStatistics)
         .locations(getIncomingLocations(reader))
-        .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+        .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
 //            .column(SchemaPath.getSimplePath("dir1"))
 //            .partitionValues()
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
@@ -460,7 +462,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   @SuppressWarnings("unchecked")
   private BaseTableMetadata getTableMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics) {
-    String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
     List<StatisticsHolder> updatedMetaStats = new ArrayList<>(metadataStatistics);
     updatedMetaStats.add(new StatisticsHolder(popConfig.getContext().analyzeMetadataLevel(), TableStatisticsKind.ANALYZE_METADATA_LEVEL));
 
@@ -477,7 +478,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         .partitionKeys(Collections.emptyMap())
         .interestingColumns(popConfig.getContext().interestingColumns())
         .location(popConfig.getContext().location())
-        .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+        .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
         .build();
 
@@ -494,7 +495,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   private SegmentMetadata getSegmentMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
-    String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
 
     String segmentKey = segmentColumns.size() > 0
         ? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -521,7 +521,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         .locations(getIncomingLocations(reader))
         .column(segmentColumns.size() > 0 ? SchemaPath.getSimplePath(segmentColumns.get(nestingLevel - 1)) : null)
         .partitionValues(partitionValues)
-        .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+        .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
         .build();
   }
@@ -529,7 +529,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   private FileMetadata getFileMetadata(TupleReader reader, List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
-    String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
 
     String segmentKey = segmentColumns.size() > 0
         ? reader.column(segmentColumns.iterator().next()).scalar().getString()
@@ -555,15 +554,13 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         .columnsStatistics(columnStatistics)
         .metadataStatistics(metadataStatistics)
         .path(path)
-        .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+        .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
         .build();
   }
 
   private RowGroupMetadata getRowGroupMetadata(TupleReader reader,List<StatisticsHolder> metadataStatistics,
       Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
-    String lastModifiedTimeCol = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
-    String rgi = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
 
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
     String segmentKey = segmentColumns.size() > 0
@@ -577,7 +574,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
 
     Path path = new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString());
 
-    int rowGroupIndex = Integer.parseInt(reader.column(rgi).scalar().getString());
+    int rowGroupIndex = Integer.parseInt(reader.column(columnNamesOptions.rowGroupIndex()).scalar().getString());
 
     String metadataIdentifier = MetadataIdentifierUtils.getRowGroupMetadataIdentifier(partitionValues, path, rowGroupIndex);
 
@@ -595,7 +592,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
         .hostAffinity(Collections.emptyMap())
         .rowGroupIndex(rowGroupIndex)
         .path(path)
-        .lastModifiedTime(Long.parseLong(reader.column(lastModifiedTimeCol).scalar().getString()))
+        .lastModifiedTime(Long.parseLong(reader.column(columnNamesOptions.lastModifiedTime()).scalar().getString()))
         .schema(TupleMetadata.of(reader.column(MetastoreAnalyzeConstants.SCHEMA_FIELD).scalar().getString()))
         .build();
   }
@@ -644,19 +641,22 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC
   @SuppressWarnings("unchecked")
   private List<StatisticsHolder> getMetadataStatistics(TupleReader reader, TupleMetadata columnMetadata) {
     List<StatisticsHolder> metadataStatistics = new ArrayList<>();
-    String rgs = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
-    String rgl = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
+    String rgs = columnNamesOptions.rowGroupStart();
+    String rgl = columnNamesOptions.rowGroupLength();
     for (ColumnMetadata column : columnMetadata) {
       String columnName = column.name();
+      ObjectReader objectReader = reader.column(columnName);
       if (AnalyzeColumnUtils.isMetadataStatisticsField(columnName)) {
-        metadataStatistics.add(new StatisticsHolder(reader.column(columnName).getObject(),
+        metadataStatistics.add(new StatisticsHolder(objectReader.getObject(),
             AnalyzeColumnUtils.getStatisticsKind(columnName)));
-      } else if (columnName.equals(rgs)) {
-        metadataStatistics.add(new StatisticsHolder(Long.parseLong(reader.column(columnName).scalar().getString()),
-            new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
-      } else if (columnName.equals(rgl)) {
-        metadataStatistics.add(new StatisticsHolder(Long.parseLong(reader.column(columnName).scalar().getString()),
-            new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+      } else if (!objectReader.isNull()) {
+        if (columnName.equals(rgs)) {
+          metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
+              new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
+        } else if (columnName.equals(rgl)) {
+          metadataStatistics.add(new StatisticsHolder(Long.parseLong(objectReader.scalar().getString()),
+              new BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+        }
       }
     }
     return metadataStatistics;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 7444a04..22e90fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.physical.impl.metadata;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.MetadataHandlerPOP;
@@ -81,6 +81,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
   private final Tables tables;
   private final MetadataType metadataType;
   private final Map<String, MetadataInfo> metadataToHandle;
+  private final ColumnNamesOptions columnNamesOptions;
 
   private boolean firstBatch = true;
 
@@ -89,6 +90,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
     super(popConfig, context, incoming);
     this.tables = context.getMetastoreRegistry().get().tables();
     this.metadataType = popConfig.getContext().metadataType();
+    this.columnNamesOptions = new ColumnNamesOptions(context.getOptions());
     this.metadataToHandle = popConfig.getContext().metadataToHandle() != null
         ? popConfig.getContext().metadataToHandle().stream()
             .collect(Collectors.toMap(MetadataInfo::identifier, Function.identity()))
@@ -101,7 +103,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
     // 2. For the case when incoming operator returned nothing - no updated underlying metadata was found.
     // 3. Fetches metadata which should be handled but wasn't returned by incoming batch from the Metastore
 
-    IterOutcome outcome = next(incoming);
+    IterOutcome outcome = incoming.getRecordCount() == 0 ? next(incoming) : getLastKnownOutcome();
 
     switch (outcome) {
       case NONE:
@@ -117,8 +119,7 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
             outcome = IterOutcome.OK;
           }
         }
-        doWorkInternal();
-        return outcome;
+        // fall thru
       case OK:
         assert !firstBatch : "First batch should be OK_NEW_SCHEMA";
         doWorkInternal();
@@ -286,14 +287,14 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
     }
 
     if (metadataType == MetadataType.ROW_GROUP) {
-      schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL), MinorType.VARCHAR);
-      schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL), MinorType.VARCHAR);
-      schemaBuilder.addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL), MinorType.VARCHAR);
+      schemaBuilder.addNullable(columnNamesOptions.rowGroupIndex(), MinorType.VARCHAR);
+      schemaBuilder.addNullable(columnNamesOptions.rowGroupStart(), MinorType.VARCHAR);
+      schemaBuilder.addNullable(columnNamesOptions.rowGroupLength(), MinorType.VARCHAR);
     }
 
     schemaBuilder
         .addNullable(MetastoreAnalyzeConstants.SCHEMA_FIELD, MinorType.VARCHAR)
-        .addNullable(context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL), MinorType.VARCHAR)
+        .addNullable(columnNamesOptions.lastModifiedTime(), MinorType.VARCHAR)
         .add(MetastoreAnalyzeConstants.METADATA_TYPE, MinorType.VARCHAR);
 
     ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
@@ -307,11 +308,6 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
   private <T extends BaseMetadata & LocationProvider> VectorContainer writeMetadataUsingBatchSchema(List<T> metadataList) {
     Preconditions.checkArgument(!metadataList.isEmpty(), "Metadata list shouldn't be empty.");
 
-    String lastModifiedTimeField = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
-    String rgiField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
-    String rgsField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
-    String rglField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
-
     ResultSetLoader resultSetLoader = getResultSetLoaderWithBatchSchema();
     resultSetLoader.startBatch();
     RowSetLoader rowWriter = resultSetLoader.writer();
@@ -352,13 +348,13 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
           arguments.add(new Object[]{});
         } else if (fieldName.equals(MetastoreAnalyzeConstants.SCHEMA_FIELD)) {
           arguments.add(metadata.getSchema().jsonString());
-        } else if (fieldName.equals(lastModifiedTimeField)) {
+        } else if (fieldName.equals(columnNamesOptions.lastModifiedTime())) {
           arguments.add(String.valueOf(metadata.getLastModifiedTime()));
-        } else if (fieldName.equals(rgiField)) {
+        } else if (fieldName.equals(columnNamesOptions.rowGroupIndex())) {
           arguments.add(String.valueOf(((RowGroupMetadata) metadata).getRowGroupIndex()));
-        } else if (fieldName.equals(rgsField)) {
+        } else if (fieldName.equals(columnNamesOptions.rowGroupStart())) {
           arguments.add(Long.toString(metadata.getStatistic(() -> ExactStatisticsConstants.START)));
-        } else if (fieldName.equals(rglField)) {
+        } else if (fieldName.equals(columnNamesOptions.rowGroupLength())) {
           arguments.add(Long.toString(metadata.getStatistic(() -> ExactStatisticsConstants.LENGTH)));
         } else if (fieldName.equals(MetastoreAnalyzeConstants.METADATA_TYPE)) {
           arguments.add(metadataType.name());
@@ -374,10 +370,6 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
   }
 
   private ResultSetLoader getResultSetLoaderWithBatchSchema() {
-    String lastModifiedTimeField = context.getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL);
-    String rgiField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
-    String rgsField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_START_COLUMN_LABEL);
-    String rglField = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_LENGTH_COLUMN_LABEL);
     SchemaBuilder schemaBuilder = new SchemaBuilder();
     // adds fields to the schema preserving their order to avoid issues in outcoming batches
     for (VectorWrapper<?> vectorWrapper : container) {
@@ -385,10 +377,10 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
       String fieldName = field.getName();
       if (fieldName.equals(MetastoreAnalyzeConstants.LOCATION_FIELD)
           || fieldName.equals(MetastoreAnalyzeConstants.SCHEMA_FIELD)
-          || fieldName.equals(lastModifiedTimeField)
-          || fieldName.equals(rgiField)
-          || fieldName.equals(rgsField)
-          || fieldName.equals(rglField)
+          || fieldName.equals(columnNamesOptions.lastModifiedTime())
+          || fieldName.equals(columnNamesOptions.rowGroupIndex())
+          || fieldName.equals(columnNamesOptions.rowGroupStart())
+          || fieldName.equals(columnNamesOptions.rowGroupLength())
           || fieldName.equals(MetastoreAnalyzeConstants.METADATA_TYPE)
           || popConfig.getContext().segmentColumns().contains(fieldName)) {
         schemaBuilder.add(fieldName, field.getType().getMinorType(), field.getDataMode());
@@ -416,9 +408,9 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
     container.clear();
     StreamSupport.stream(populatedContainer.spliterator(), false)
         .map(VectorWrapper::getField)
-        .filter(field -> field.getType().getMinorType() != MinorType.NULL)
         .forEach(container::addOrGet);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.setEmpty();
   }
 
   protected boolean setupNewSchema() {
@@ -445,18 +437,17 @@ public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHand
   }
 
   private void updateMetadataToHandle() {
-    RowSetReader reader = DirectRowSet.fromContainer(container).reader();
     // updates metadataToHandle to be able to fetch required data which wasn't returned by incoming batch
     if (metadataToHandle != null && !metadataToHandle.isEmpty()) {
+      RowSetReader reader = DirectRowSet.fromContainer(container).reader();
       switch (metadataType) {
         case ROW_GROUP: {
-          String rgiColumnName = context.getOptions().getString(ExecConstants.IMPLICIT_ROW_GROUP_INDEX_COLUMN_LABEL);
           while (reader.next() && !metadataToHandle.isEmpty()) {
             List<String> partitionValues = popConfig.getContext().segmentColumns().stream()
                 .map(columnName -> reader.column(columnName).scalar().getString())
                 .collect(Collectors.toList());
             Path location = new Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString());
-            int rgi = Integer.parseInt(reader.column(rgiColumnName).scalar().getString());
+            int rgi = Integer.parseInt(reader.column(columnNamesOptions.rowGroupIndex()).scalar().getString());
             metadataToHandle.remove(MetadataIdentifierUtils.getRowGroupMetadataIdentifier(partitionValues, location, rgi));
           }
           break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
new file mode 100644
index 0000000..7c83b2d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatch.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.metadata;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggregator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MetadataHashAggBatch extends HashAggBatch {
+  private List<NamedExpression> valueExpressions;
+
+  public MetadataHashAggBatch(MetadataHashAggPOP popConfig, RecordBatch incoming, FragmentContext context) {
+    super(popConfig, incoming, context);
+  }
+
+  @Override
+  protected HashAggregator createAggregatorInternal()
+      throws SchemaChangeException, ClassTransformationException, IOException {
+    MetadataHashAggPOP popConfig = (MetadataHashAggPOP) this.popConfig;
+
+    valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
+            new ColumnNamesOptions(context.getOptions()), incoming.getSchema(), popConfig.getPhase())
+        .getValueExpressions();
+
+    return super.createAggregatorInternal();
+  }
+
+  @Override
+  protected List<NamedExpression> getValueExpressions() {
+    return valueExpressions;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
similarity index 80%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
index eda40e9..2563b0e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHashAggBatchCreator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.metadata;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -27,12 +27,12 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.List;
 
-public class MetadataAggBatchCreator implements BatchCreator<MetadataAggPOP> {
+public class MetadataHashAggBatchCreator implements BatchCreator<MetadataHashAggPOP> {
 
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
-      MetadataAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+      MetadataHashAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
-    return new MetadataAggBatch(config, children.iterator().next(), context);
+    return new MetadataHashAggBatch(config, children.iterator().next(), context);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
new file mode 100644
index 0000000..d982179
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatch.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.metadata;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Operator which adds aggregate calls for all incoming columns to calculate required metadata and produces aggregations.
+ * If aggregation is performed on top of another aggregation, required aggregate calls for merging metadata will be added.
+ */
+public class MetadataStreamAggBatch extends StreamingAggBatch {
+
+  private List<NamedExpression> valueExpressions;
+
+  public MetadataStreamAggBatch(MetadataStreamAggPOP popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+    super(popConfig, incoming, context);
+  }
+
+  @Override
+  protected StreamingAggregator createAggregatorInternal()
+      throws SchemaChangeException, ClassTransformationException, IOException {
+    MetadataStreamAggPOP popConfig = (MetadataStreamAggPOP) this.popConfig;
+
+    valueExpressions = new MetadataAggregateHelper(popConfig.getContext(),
+            new ColumnNamesOptions(context.getOptions()), incoming.getSchema(), popConfig.getPhase())
+        .getValueExpressions();
+
+    return super.createAggregatorInternal();
+  }
+
+  @Override
+  protected List<NamedExpression> getValueExpressions() {
+    return valueExpressions;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
similarity index 80%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
index eda40e9..15c494b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataStreamAggBatchCreator.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.metadata;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
@@ -27,12 +27,12 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 import java.util.List;
 
-public class MetadataAggBatchCreator implements BatchCreator<MetadataAggPOP> {
+public class MetadataStreamAggBatchCreator implements BatchCreator<MetadataStreamAggPOP> {
 
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
-      MetadataAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+      MetadataStreamAggPOP config, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.size() == 1);
-    return new MetadataAggBatch(config, children.iterator().next(), context);
+    return new MetadataStreamAggBatch(config, children.iterator().next(), context);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 674b3ab..185b8f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -516,7 +516,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         }
 
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
-        ((DrillFuncHolderExpr) expr).getFieldReference(namedExpression.getRef());
+        ((DrillFuncHolderExpr) expr).setFieldReference(namedExpression.getRef());
         cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
         if (complexFieldReferencesList == null) {
           complexFieldReferencesList = Lists.newArrayList();
@@ -578,7 +578,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   }
 
   private boolean isImplicitFileColumn(ValueVector vvIn) {
-    return columnExplorer.isImplicitFileColumn(vvIn.getField().getName());
+    return columnExplorer.isImplicitOrInternalFileColumn(vvIn.getField().getName());
   }
 
   private List<NamedExpression> getExpressionList() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 39189b1..8793a65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -36,6 +36,10 @@ import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
 import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch;
+import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
 import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
@@ -247,6 +251,10 @@ public class BatchValidator {
     rules.put(HashJoinBatch.class, CheckMode.VECTORS);
     rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
     rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
+    rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS);
+    rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS);
+    rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS);
+    rules.put(MetadataControllerBatch.class, CheckMode.VECTORS);
     return rules;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
index 75dfeac..3bbe6ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/BaseReaderBuilder.java
@@ -46,7 +46,7 @@ import org.apache.drill.exec.vector.complex.UnionVector;
  * <p>
  * Derived classes handle the details of the various kinds of readers.
  * Today there is a single subclass that builds (test-time)
- * {@link RowSet} objects. The idea, however, is that we may eventually
+ * {@link org.apache.drill.exec.physical.rowSet.RowSet} objects. The idea, however, is that we may eventually
  * want to create a "result set reader" for use in internal operators,
  * in parallel to the "result set loader". The result set reader would
  * handle a stream of incoming batches. The extant RowSet class handles
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
index 4b734e6..9bf9f44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetFormatter.java
@@ -25,6 +25,7 @@ import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
@@ -60,6 +61,10 @@ public class RowSetFormatter {
     RowSets.wrap(batch).print();
   }
 
+  public static void print(RecordBatch batch) {
+    RowSets.wrap(batch).print();
+  }
+
   public static String toString(RowSet rowSet) {
     StringBuilderWriter out = new StringBuilderWriter();
     new RowSetFormatter(rowSet, out).write();
@@ -116,18 +121,18 @@ public class RowSetFormatter {
   }
 
   private void writeHeader(Writer writer, RowSetReader reader, SelectionVectorMode selectionMode) throws IOException {
-    writer.write(Integer.toString(reader.logicalIndex()));
+    writer.write(String.valueOf(reader.logicalIndex()));
     switch (selectionMode) {
       case FOUR_BYTE:
         writer.write(" (");
-        writer.write(reader.hyperVectorIndex());
+        writer.write(String.valueOf(reader.hyperVectorIndex()));
         writer.write(", ");
-        writer.write(Integer.toString(reader.offset()));
+        writer.write(String.valueOf(reader.offset()));
         writer.write(")");
         break;
       case TWO_BYTE:
         writer.write(" (");
-        writer.write(Integer.toString(reader.offset()));
+        writer.write(String.valueOf(reader.offset()));
         writer.write(")");
         break;
       default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index a3e907b..203150f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner;
 
+import org.apache.drill.exec.planner.logical.ConvertMetadataAggregateToDirectScanRule;
 import org.apache.drill.exec.planner.physical.MetadataAggPrule;
 import org.apache.drill.exec.planner.physical.MetadataControllerPrule;
 import org.apache.drill.exec.planner.physical.MetadataHandlerPrule;
@@ -525,6 +526,7 @@ public enum PlannerPhase {
     ruleList.add(MetadataControllerPrule.INSTANCE);
     ruleList.add(MetadataHandlerPrule.INSTANCE);
     ruleList.add(MetadataAggPrule.INSTANCE);
+    ruleList.add(ConvertMetadataAggregateToDirectScanRule.INSTANCE);
 
     ruleList.add(UnnestPrule.INSTANCE);
     ruleList.add(LateralJoinPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
index fb29cda..64a0b68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
@@ -102,7 +102,7 @@ public class ConvertCountToDirectScanRule extends RelOptRule {
   private static final Logger logger = LoggerFactory.getLogger(ConvertCountToDirectScanRule.class);
 
   private ConvertCountToDirectScanRule(RelOptRuleOperand rule, String id) {
-    super(rule, "ConvertCountToDirectScanRule:" + id);
+    super(rule, DrillRelFactories.LOGICAL_BUILDER, "ConvertCountToDirectScanRule:" + id);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
new file mode 100644
index 0000000..43f6383
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertMetadataAggregateToDirectScanRule.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.IsPredicate;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
+import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
+import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.ColumnExplorer.ImplicitFileColumns;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.direct.DirectGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.metadata.RowGroupMetadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
+import org.apache.drill.metastore.statistics.ExactStatisticsConstants;
+import org.apache.drill.metastore.statistics.StatisticsKind;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.collect.HashBasedTable;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Table;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Rule which converts
+ *
+ * <pre>
+ *   MetadataAggRel(metadataLevel=ROW_GROUP)
+ *   \
+ *   DrillScanRel
+ * </pre>
+ * <p/>
+ * plan into
+ * <pre>
+ *   DrillDirectScanRel
+ * </pre>
+ * where {@link DrillDirectScanRel} is populated with row group metadata.
+ * <p/>
+ * For the case when aggregate level is not ROW_GROUP, resulting plan will be the following:
+ *
+ * <pre>
+ *   MetadataAggRel(metadataLevel=FILE (or another non-ROW_GROUP value), createNewAggregations=false)
+ *   \
+ *   DrillDirectScanRel
+ * </pre>
+ */
+public class ConvertMetadataAggregateToDirectScanRule extends RelOptRule {
+  public static final ConvertMetadataAggregateToDirectScanRule INSTANCE =
+      new ConvertMetadataAggregateToDirectScanRule();
+
+  private static final Logger logger = LoggerFactory.getLogger(ConvertMetadataAggregateToDirectScanRule.class);
+
+  public ConvertMetadataAggregateToDirectScanRule() {
+    super(
+        RelOptHelper.some(MetadataAggRel.class, RelOptHelper.any(DrillScanRel.class)),
+        DrillRelFactories.LOGICAL_BUILDER, "ConvertMetadataAggregateToDirectScanRule");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    MetadataAggRel agg = call.rel(0);
+    DrillScanRel scan = call.rel(1);
+
+    GroupScan oldGrpScan = scan.getGroupScan();
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+
+    // Only apply the rule for parquet group scan and for the case when required column metadata is present
+    if (!(oldGrpScan instanceof ParquetGroupScan)
+        || (oldGrpScan.getTableMetadata().getInterestingColumns() != null
+          && !oldGrpScan.getTableMetadata().getInterestingColumns().containsAll(agg.getContext().interestingColumns()))) {
+      return;
+    }
+
+    try {
+      DirectGroupScan directScan = buildDirectScan(agg.getContext().interestingColumns(), scan, settings);
+      if (directScan == null) {
+        logger.warn("Unable to use parquet metadata for ANALYZE since some required metadata is absent within parquet metadata");
+        return;
+      }
+
+      RelNode converted = new DrillDirectScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+          directScan, scan.getRowType());
+      if (agg.getContext().metadataLevel() != MetadataType.ROW_GROUP) {
+        MetadataAggregateContext updatedContext = agg.getContext().toBuilder()
+            .createNewAggregations(false)
+            .build();
+        converted = new MetadataAggRel(agg.getCluster(), agg.getTraitSet(), converted, updatedContext);
+      }
+
+      call.transformTo(converted);
+    } catch (Exception e) {
+      logger.warn("Unable to use parquet metadata for ANALYZE: {}", e.getMessage(), e);
+    }
+  }
+
+  private DirectGroupScan buildDirectScan(List<SchemaPath> interestingColumns, DrillScanRel scan, PlannerSettings settings) throws IOException {
+    DrillTable drillTable = Utilities.getDrillTable(scan.getTable());
+
+    ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(settings.getOptions());
+
+    // populates schema to be used when adding record values
+    FormatSelection selection = (FormatSelection) drillTable.getSelection();
+
+    // adds partition columns to the schema
+    Map<String, Class<?>> schema = ColumnExplorer.getPartitionColumnNames(selection.getSelection(), columnNamesOptions).stream()
+        .collect(Collectors.toMap(
+            Function.identity(),
+            s -> String.class,
+            (o, n) -> n));
+
+    // adds internal implicit columns to the schema
+
+    schema.put(MetastoreAnalyzeConstants.SCHEMA_FIELD, String.class);
+    schema.put(MetastoreAnalyzeConstants.LOCATION_FIELD, String.class);
+    schema.put(columnNamesOptions.rowGroupIndex(), String.class);
+    schema.put(columnNamesOptions.rowGroupStart(), String.class);
+    schema.put(columnNamesOptions.rowGroupLength(), String.class);
+    schema.put(columnNamesOptions.lastModifiedTime(), String.class);
+
+    return populateRecords(interestingColumns, schema, scan, columnNamesOptions);
+  }
+
+  /**
+   * Populates records list with row group metadata.
+   */
+  private DirectGroupScan populateRecords(Collection<SchemaPath> interestingColumns, Map<String, Class<?>> schema,
+      DrillScanRel scan, ColumnNamesOptions columnNamesOptions) throws IOException {
+    ParquetGroupScan parquetGroupScan = (ParquetGroupScan) scan.getGroupScan();
+    DrillTable drillTable = Utilities.getDrillTable(scan.getTable());
+
+    Multimap<Path, RowGroupMetadata> rowGroupsMetadataMap = parquetGroupScan.getMetadataProvider().getRowGroupsMetadataMap();
+
+    Table<String, Integer, Object> recordsTable = HashBasedTable.create();
+    FormatSelection selection = (FormatSelection) drillTable.getSelection();
+    List<String> partitionColumnNames = ColumnExplorer.getPartitionColumnNames(selection.getSelection(), columnNamesOptions);
+
+    FileSystem rawFs = selection.getSelection().getSelectionRoot().getFileSystem(new Configuration());
+    DrillFileSystem fileSystem =
+        ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), rawFs.getConf());
+
+    int rowIndex = 0;
+    for (Map.Entry<Path, RowGroupMetadata> rgEntry : rowGroupsMetadataMap.entries()) {
+      Path path = rgEntry.getKey();
+      RowGroupMetadata rowGroupMetadata = rgEntry.getValue();
+      List<String> partitionValues = ColumnExplorer.listPartitionValues(path, selection.getSelection().getSelectionRoot(), false);
+      for (int i = 0; i < partitionValues.size(); i++) {
+        String partitionColumnName = partitionColumnNames.get(i);
+        recordsTable.put(partitionColumnName, rowIndex, partitionValues.get(i));
+      }
+
+      recordsTable.put(MetastoreAnalyzeConstants.LOCATION_FIELD, rowIndex, ImplicitFileColumns.FQN.getValue(path));
+      recordsTable.put(columnNamesOptions.rowGroupIndex(), rowIndex, String.valueOf(rowGroupMetadata.getRowGroupIndex()));
+
+      if (interestingColumns == null) {
+        interestingColumns = rowGroupMetadata.getColumnsStatistics().keySet();
+      }
+
+      // populates record list with row group column metadata
+      for (SchemaPath schemaPath : interestingColumns) {
+        ColumnStatistics columnStatistics = rowGroupMetadata.getColumnsStatistics().get(schemaPath);
+        if (IsPredicate.isNullOrEmpty(columnStatistics)) {
+          logger.debug("Statistics for {} column wasn't found within {} row group.", schemaPath, path);
+          return null;
+        }
+        for (StatisticsKind statisticsKind : AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet()) {
+          Object statsValue;
+          if (statisticsKind.getName().equalsIgnoreCase(TableStatisticsKind.ROW_COUNT.getName())) {
+            statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata);
+          } else if (statisticsKind.getName().equalsIgnoreCase(ColumnStatisticsKind.NON_NULL_COUNT.getName())) {
+            statsValue = TableStatisticsKind.ROW_COUNT.getValue(rowGroupMetadata) - ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
+          } else {
+            statsValue = columnStatistics.get(statisticsKind);
+          }
+          String columnStatisticsFieldName = AnalyzeColumnUtils.getColumnStatisticsFieldName(schemaPath.getRootSegmentPath(), statisticsKind);
+          if (statsValue != null) {
+            schema.putIfAbsent(
+                columnStatisticsFieldName,
+                statsValue.getClass());
+            recordsTable.put(columnStatisticsFieldName, rowIndex, statsValue);
+          }
+        }
+      }
+
+      // populates record list with row group metadata
+      for (StatisticsKind<?> statisticsKind : AnalyzeColumnUtils.META_STATISTICS_FUNCTIONS.keySet()) {
+        String metadataStatisticsFieldName = AnalyzeColumnUtils.getMetadataStatisticsFieldName(statisticsKind);
+        Object statisticsValue = rowGroupMetadata.getStatistic(statisticsKind);
+
+        if (statisticsValue != null) {
+          schema.putIfAbsent(metadataStatisticsFieldName, statisticsValue.getClass());
+          recordsTable.put(metadataStatisticsFieldName, rowIndex, statisticsValue);
+        }
+      }
+
+      // populates record list internal columns
+      recordsTable.put(MetastoreAnalyzeConstants.SCHEMA_FIELD, rowIndex, rowGroupMetadata.getSchema().jsonString());
+      recordsTable.put(columnNamesOptions.rowGroupStart(), rowIndex, Long.toString(rowGroupMetadata.getStatistic(() -> ExactStatisticsConstants.START)));
+      recordsTable.put(columnNamesOptions.rowGroupLength(), rowIndex, Long.toString(rowGroupMetadata.getStatistic(() -> ExactStatisticsConstants.LENGTH)));
+      recordsTable.put(columnNamesOptions.lastModifiedTime(), rowIndex, String.valueOf(fileSystem.getFileStatus(path).getModificationTime()));
+
+      rowIndex++;
+    }
+
+    // DynamicPojoRecordReader requires LinkedHashMap with fields order
+    // which corresponds to the value position in record list.
+    LinkedHashMap<String, Class<?>> orderedSchema = recordsTable.rowKeySet().stream()
+        .collect(Collectors.toMap(
+            Function.identity(),
+            column -> schema.getOrDefault(column, Integer.class),
+            (o, n) -> n,
+            LinkedHashMap::new));
+
+    IntFunction<List<Object>> collectRecord = currentIndex -> orderedSchema.keySet().stream()
+        .map(column -> recordsTable.get(column, currentIndex))
+        .collect(Collectors.toList());
+
+    List<List<Object>> records = IntStream.range(0, rowIndex)
+        .mapToObj(collectRecord)
+        .collect(Collectors.toList());
+
+    DynamicPojoRecordReader<?> reader = new DynamicPojoRecordReader<>(orderedSchema, records);
+
+    ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, records.size(), 1, schema.size());
+
+    return new DirectGroupScan(reader, scanStats);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
index 5683c77..6673c63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTrait.java
@@ -21,11 +21,11 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTrait;
 import org.apache.calcite.plan.RelTraitDef;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 
 public class DrillDistributionTrait implements RelTrait {
-  public static enum DistributionType {SINGLETON, HASH_DISTRIBUTED, RANGE_DISTRIBUTED, RANDOM_DISTRIBUTED,
-                                       ROUND_ROBIN_DISTRIBUTED, BROADCAST_DISTRIBUTED, ANY};
 
   public static DrillDistributionTrait SINGLETON = new DrillDistributionTrait(DistributionType.SINGLETON);
   public static DrillDistributionTrait RANDOM_DISTRIBUTED = new DrillDistributionTrait(DistributionType.RANDOM_DISTRIBUTED);
@@ -33,24 +33,24 @@ public class DrillDistributionTrait implements RelTrait {
 
   public static DrillDistributionTrait DEFAULT = ANY;
 
-  private DistributionType type;
-  private final ImmutableList<DistributionField> fields;
+  private final DistributionType type;
+  private final List<DistributionField> fields;
   private PartitionFunction partitionFunction = null;
 
   public DrillDistributionTrait(DistributionType type) {
     assert (type == DistributionType.SINGLETON || type == DistributionType.RANDOM_DISTRIBUTED || type == DistributionType.ANY
             || type == DistributionType.ROUND_ROBIN_DISTRIBUTED || type == DistributionType.BROADCAST_DISTRIBUTED);
     this.type = type;
-    this.fields = ImmutableList.<DistributionField>of();
+    this.fields = Collections.emptyList();
   }
 
-  public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields) {
+  public DrillDistributionTrait(DistributionType type, List<DistributionField> fields) {
     assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
     this.type = type;
     this.fields = fields;
   }
 
-  public DrillDistributionTrait(DistributionType type, ImmutableList<DistributionField> fields,
+  public DrillDistributionTrait(DistributionType type, List<DistributionField> fields,
       PartitionFunction partitionFunction) {
     assert (type == DistributionType.HASH_DISTRIBUTED || type == DistributionType.RANGE_DISTRIBUTED);
     this.type = type;
@@ -105,7 +105,7 @@ public class DrillDistributionTrait implements RelTrait {
     return this.type;
   }
 
-  public ImmutableList<DistributionField> getFields() {
+  public List<DistributionField> getFields() {
     return fields;
   }
 
@@ -114,12 +114,7 @@ public class DrillDistributionTrait implements RelTrait {
   }
 
   private boolean arePartitionFunctionsSame(PartitionFunction f1, PartitionFunction f2) {
-    if (f1 != null && f2 != null) {
-      return f1.equals(f2);
-    } else if (f2 == null && f2 == null) {
-      return true;
-    }
-    return false;
+    return Objects.equals(f1, f2);
   }
 
   @Override
@@ -145,6 +140,15 @@ public class DrillDistributionTrait implements RelTrait {
     return fields == null ? this.type.toString() : this.type.toString() + "(" + fields + ")";
   }
 
+  public enum DistributionType {
+    SINGLETON,
+    HASH_DISTRIBUTED,
+    RANGE_DISTRIBUTED,
+    RANDOM_DISTRIBUTED,
+    ROUND_ROBIN_DISTRIBUTED,
+    BROADCAST_DISTRIBUTED,
+    ANY
+  }
 
   public static class DistributionField {
     /**
@@ -180,4 +184,51 @@ public class DrillDistributionTrait implements RelTrait {
     }
   }
 
+  /**
+   * Stores distribution field index and field name to be used in exchange operators.
+   * Field name is required for the case of dynamic schema discovering
+   * when field is not present in rel data type at planning time.
+   */
+  public static class NamedDistributionField extends DistributionField {
+    /**
+     * Name of the field being DISTRIBUTED.
+     */
+    private final String fieldName;
+
+    public NamedDistributionField(int fieldId, String fieldName) {
+      super(fieldId);
+      this.fieldName = fieldName;
+    }
+
+    public String getFieldName() {
+      return fieldName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+
+      NamedDistributionField that = (NamedDistributionField) o;
+
+      return Objects.equals(fieldName, that.fieldName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(super.hashCode(), fieldName);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s(%s)", fieldName, getFieldId());
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index 7300a80..fdc03ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.planner.physical;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
@@ -59,8 +58,7 @@ public class HashAggPrule extends AggPruleBase {
     final DrillAggregateRel aggregate = call.rel(0);
     final RelNode input = call.rel(1);
 
-    if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0
-        || requiresStreamingAgg(aggregate)) {
+    if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
       // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
       // if there are no grouping keys
       return;
@@ -103,16 +101,6 @@ public class HashAggPrule extends AggPruleBase {
     }
   }
 
-  private boolean requiresStreamingAgg(DrillAggregateRel aggregate) {
-    //If contains ANY_VALUE aggregate, using HashAgg would not work
-    for (AggregateCall agg : aggregate.getAggCallList()) {
-      if (agg.getAggregation().getName().equalsIgnoreCase("any_value")) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> {
     final RelTrait distOnAllKeys;
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
index 494eb82..a7e8cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashPrelUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.NamedDistributionField;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.calcite.rel.type.RelDataType;
@@ -52,13 +53,8 @@ public class HashPrelUtil {
   /**
    * Implementation of {@link HashExpressionCreatorHelper} for {@link LogicalExpression} type.
    */
-  public static HashExpressionCreatorHelper<LogicalExpression> HASH_HELPER_LOGICALEXPRESSION =
-      new HashExpressionCreatorHelper<LogicalExpression>() {
-        @Override
-        public LogicalExpression createCall(String funcName, List<LogicalExpression> inputFiled) {
-          return new FunctionCall(funcName, inputFiled, ExpressionPosition.UNKNOWN);
-        }
-      };
+  public static HashExpressionCreatorHelper<LogicalExpression> HASH_HELPER_LOGICAL_EXPRESSION =
+      (funcName, inputFiled) -> new FunctionCall(funcName, inputFiled, ExpressionPosition.UNKNOWN);
 
   public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> {
     private final RexBuilder rexBuilder;
@@ -161,7 +157,7 @@ public class HashPrelUtil {
    * @return hash expression
    */
   public static LogicalExpression getHash64Expression(LogicalExpression field, LogicalExpression seed, boolean hashAsDouble) {
-    return createHash64Expression(ImmutableList.of(field), seed, HASH_HELPER_LOGICALEXPRESSION, hashAsDouble);
+    return createHash64Expression(ImmutableList.of(field), seed, HASH_HELPER_LOGICAL_EXPRESSION, hashAsDouble);
   }
 
   /**
@@ -173,33 +169,38 @@ public class HashPrelUtil {
    * @return hash expression
    */
   public static LogicalExpression getHashExpression(LogicalExpression field, LogicalExpression seed, boolean hashAsDouble) {
-    return createHashExpression(ImmutableList.of(field), seed, HASH_HELPER_LOGICALEXPRESSION, hashAsDouble);
+    return createHashExpression(ImmutableList.of(field), seed, HASH_HELPER_LOGICAL_EXPRESSION, hashAsDouble);
   }
 
 
   /**
    * Create a distribution hash expression.
    *
-   * @param fields Distribution fields
+   * @param fields  Distribution fields
    * @param rowType Row type
-   * @return
+   * @return distribution hash expression
    */
   public static LogicalExpression getHashExpression(List<DistributionField> fields, RelDataType rowType) {
     assert fields.size() > 0;
 
-    final List<String> childFields = rowType.getFieldNames();
+    List<String> childFields = rowType.getFieldNames();
 
     // If we already included a field with hash - no need to calculate hash further down
-    if ( childFields.contains(HASH_EXPR_NAME)) {
+    if (childFields.contains(HASH_EXPR_NAME)) {
       return new FieldReference(HASH_EXPR_NAME);
     }
 
-    final List<LogicalExpression> expressions = new ArrayList<LogicalExpression>(childFields.size());
-    for(int i =0; i < fields.size(); i++){
-      expressions.add(new FieldReference(childFields.get(fields.get(i).getFieldId()), ExpressionPosition.UNKNOWN));
+    List<LogicalExpression> expressions = new ArrayList<>();
+    for (DistributionField field : fields) {
+      if (field instanceof NamedDistributionField) {
+        NamedDistributionField namedDistributionField = (NamedDistributionField) field;
+        expressions.add(new FieldReference(namedDistributionField.getFieldName(), ExpressionPosition.UNKNOWN));
+      } else {
+        expressions.add(new FieldReference(childFields.get(field.getFieldId()), ExpressionPosition.UNKNOWN));
+      }
     }
 
-    final LogicalExpression distSeed = ValueExpressions.getInt(DIST_SEED);
-    return createHashBasedPartitionExpression(expressions, distSeed, HASH_HELPER_LOGICALEXPRESSION);
+    LogicalExpression distSeed = ValueExpressions.getInt(DIST_SEED);
+    return createHashBasedPartitionExpression(expressions, distSeed, HASH_HELPER_LOGICAL_EXPRESSION);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
index 2ce6e46..9a9e11c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
@@ -20,15 +20,26 @@ package org.apache.drill.exec.planner.physical;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.MetadataAggRel;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.NamedDistributionField;
+import org.apache.drill.exec.store.parquet.FilterEvaluatorUtils.FieldReferenceFinder;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 public class MetadataAggPrule extends Prule {
   public static final MetadataAggPrule INSTANCE = new MetadataAggPrule();
@@ -40,21 +51,178 @@ public class MetadataAggPrule extends Prule {
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    MetadataAggRel relNode = call.rel(0);
-    RelNode input = relNode.getInput();
-
-    int groupByExprsSize = relNode.getContext().groupByExpressions().size();
-
-    // group by expressions will be returned first
-    RelCollation collation = RelCollations.of(IntStream.range(1, groupByExprsSize)
-        .mapToObj(RelFieldCollation::new)
-        .collect(Collectors.toList()));
-
-    // TODO: update DrillDistributionTrait when implemented parallelization for metadata collecting (see DRILL-7433)
-    RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
-    traits = groupByExprsSize > 0 ? traits.plus(collation) : traits;
-    RelNode convertedInput = convert(input, traits);
-    call.transformTo(
-        new MetadataAggPrel(relNode.getCluster(), traits, convertedInput, relNode.getContext()));
+    MetadataAggRel aggregate = call.rel(0);
+    RelNode input = aggregate.getInput();
+
+    int groupByExprsSize = aggregate.getContext().groupByExpressions().size();
+
+    List<RelFieldCollation> collations = new ArrayList<>();
+    List<String> names = new ArrayList<>();
+    for (int i = 0; i < groupByExprsSize; i++) {
+      collations.add(new RelFieldCollation(i + 1));
+      SchemaPath fieldPath = getArgumentReference(aggregate.getContext().groupByExpressions().get(i));
+      names.add(fieldPath.getRootSegmentPath());
+    }
+
+    RelCollation collation = new NamedRelCollation(collations, names);
+
+    RelTraitSet traits;
+
+    if (aggregate.getContext().groupByExpressions().isEmpty()) {
+      DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+      RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+
+      createTransformRequest(call, aggregate, input, singleDistTrait);
+    } else {
+      // hash distribute on all grouping keys
+      DrillDistributionTrait distOnAllKeys =
+          new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+              ImmutableList.copyOf(getDistributionFields(aggregate.getContext().groupByExpressions())));
+
+      PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+      boolean smallInput =
+          input.estimateRowCount(input.getCluster().getMetadataQuery()) < settings.getSliceTarget();
+
+      // force 2-phase aggregation for bottom aggregate call
+      // to produce sort locally before aggregation is produced for large inputs
+      if (aggregate.getContext().createNewAggregations() && !smallInput) {
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+        RelNode convertedInput = convert(input, traits);
+
+        new TwoPhaseMetadataAggSubsetTransformer(call, collation, distOnAllKeys)
+            .go(aggregate, convertedInput);
+      } else {
+        // TODO: DRILL-7433 - replace DrillDistributionTrait.SINGLETON with distOnAllKeys when parallelization for MetadataHandler is implemented
+        traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.SINGLETON);
+        createTransformRequest(call, aggregate, input, traits);
+      }
+    }
+  }
+
+  private void createTransformRequest(RelOptRuleCall call, MetadataAggRel aggregate,
+      RelNode input, RelTraitSet traits) {
+
+    RelNode convertedInput = convert(input, PrelUtil.fixTraits(call, traits));
+
+    MetadataStreamAggPrel newAgg = new MetadataStreamAggPrel(
+        aggregate.getCluster(),
+        traits,
+        convertedInput,
+        aggregate.getContext(),
+        OperatorPhase.PHASE_1of1);
+
+    call.transformTo(newAgg);
+  }
+
+  /**
+   * Returns list with named distribution fields which correspond to specified expressions arguments.
+   *
+   * @param namedExpressions expressions list
+   * @return list of {@link NamedDistributionField} instances
+   */
+  private static List<NamedDistributionField> getDistributionFields(List<NamedExpression> namedExpressions) {
+    List<NamedDistributionField> distributionFields = new ArrayList<>();
+    int groupByExprsSize = namedExpressions.size();
+
+    for (int index = 0; index < groupByExprsSize; index++) {
+      SchemaPath fieldPath = getArgumentReference(namedExpressions.get(index));
+      NamedDistributionField field =
+          new NamedDistributionField(index + 1, fieldPath.getRootSegmentPath());
+      distributionFields.add(field);
+    }
+
+    return distributionFields;
+  }
+
+  /**
+   * Returns {@link FieldReference} instance which corresponds to the argument of specified {@code namedExpression}.
+   *
+   * @param namedExpression expression
+   * @return {@link FieldReference} instance
+   */
+  private static FieldReference getArgumentReference(NamedExpression namedExpression) {
+    Set<SchemaPath> arguments = namedExpression.getExpr().accept(FieldReferenceFinder.INSTANCE, null);
+    assert arguments.size() == 1 : "Group by expression contains more than one argument";
+    return new FieldReference(arguments.iterator().next());
+  }
+
+  /**
+   * Implementation of {@link RelCollationImpl} with field name.
+   * Stores {@link RelFieldCollation} list and corresponding field names to be used in sort operators.
+   * Field name is required for the case of dynamic schema discovering
+   * when field is not present in rel data type at planning time.
+   */
+  public static class NamedRelCollation extends RelCollationImpl {
+    private final List<String> names;
+
+    protected NamedRelCollation(List<RelFieldCollation> fieldCollations, List<String> names) {
+      super(com.google.common.collect.ImmutableList.copyOf(fieldCollations));
+      this.names = Collections.unmodifiableList(names);
+    }
+
+    public String getName(int collationIndex) {
+      return names.get(collationIndex - 1);
+    }
+  }
+
+  /**
+   * {@link SubsetTransformer} for creating two-phase metadata aggregation.
+   */
+  private static class TwoPhaseMetadataAggSubsetTransformer
+      extends SubsetTransformer<MetadataAggRel, RuntimeException> {
+
+    private final RelCollation collation;
+    private final DrillDistributionTrait distributionTrait;
+
+    public TwoPhaseMetadataAggSubsetTransformer(RelOptRuleCall call,
+        RelCollation collation, DrillDistributionTrait distributionTrait) {
+      super(call);
+      this.collation = collation;
+      this.distributionTrait = distributionTrait;
+    }
+
+    @Override
+    public RelNode convertChild(MetadataAggRel aggregate, RelNode child) {
+      DrillDistributionTrait toDist = child.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+      RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, RelCollations.EMPTY, toDist);
+      RelNode newInput = convert(child, traits);
+
+      // maps group by expressions to themselves to be able to produce the second aggregation
+      List<NamedExpression> identityExpressions = aggregate.getContext().groupByExpressions().stream()
+          .map(namedExpression -> new NamedExpression(namedExpression.getExpr(), getArgumentReference(namedExpression)))
+          .collect(Collectors.toList());
+
+      // use hash aggregation for the first stage to avoid sorting raw data
+      MetadataHashAggPrel phase1Agg = new MetadataHashAggPrel(
+          aggregate.getCluster(),
+          traits,
+          newInput,
+          aggregate.getContext().toBuilder().groupByExpressions(identityExpressions).build(),
+          OperatorPhase.PHASE_1of2);
+
+      traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist).plus(distributionTrait);
+      SortPrel sort = new SortPrel(
+          aggregate.getCluster(),
+          traits,
+          phase1Agg,
+          (RelCollation) traits.getTrait(collation.getTraitDef()));
+
+      int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+      HashToMergeExchangePrel exch =
+          new HashToMergeExchangePrel(phase1Agg.getCluster(),
+              traits,
+              sort,
+              ImmutableList.copyOf(getDistributionFields(aggregate.getContext().groupByExpressions())),
+              collation,
+              numEndPoints);
+
+      return new MetadataStreamAggPrel(
+          aggregate.getCluster(),
+          newTraitSet(Prel.DRILL_PHYSICAL, collation, DrillDistributionTrait.SINGLETON),
+          exch,
+          aggregate.getContext(),
+          OperatorPhase.PHASE_2of2);
+    }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
index 8424469..bd8beab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHandlerPrule.java
@@ -36,7 +36,7 @@ public class MetadataHandlerPrule extends Prule {
   public void onMatch(RelOptRuleCall call) {
     MetadataHandlerRel relNode = call.rel(0);
     RelNode input = relNode.getInput();
-    RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.DEFAULT);
+    RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
     RelNode convertedInput = convert(input, traits);
     call.transformTo(new MetadataHandlerPrel(relNode.getCluster(), traits,
         convertedInput,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
similarity index 78%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
index a50f1a8..8f454f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataHashAggPrel.java
@@ -21,11 +21,11 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataHashAggPOP;
 import org.apache.drill.exec.planner.common.DrillRelNode;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
-import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
@@ -33,19 +33,21 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
+public class MetadataHashAggPrel extends SingleRel implements DrillRelNode, Prel {
   private final MetadataAggregateContext context;
+  private final AggPrelBase.OperatorPhase phase;
 
-  public MetadataAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-      MetadataAggregateContext context) {
+  public MetadataHashAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+      MetadataAggregateContext context, AggPrelBase.OperatorPhase phase) {
     super(cluster, traits, input);
     this.context = context;
+    this.phase = phase;
   }
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getInput();
-    MetadataAggPOP physicalOperator = new MetadataAggPOP(child.getPhysicalOperator(creator), context);
+    MetadataHashAggPOP physicalOperator = new MetadataHashAggPOP(child.getPhysicalOperator(creator), context, phase);
     return creator.addMetadata(this, physicalOperator);
   }
 
@@ -66,7 +68,7 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
 
   @Override
   public boolean needsFinalColumnReordering() {
-    return true;
+    return false;
   }
 
   @Override
@@ -77,6 +79,10 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     Preconditions.checkState(inputs.size() == 1);
-    return new MetadataAggPrel(getCluster(), traitSet, inputs.iterator().next(), context);
+    return new MetadataHashAggPrel(getCluster(), traitSet, inputs.iterator().next(), context, phase);
+  }
+
+  public AggPrelBase.OperatorPhase getPhase() {
+    return phase;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
similarity index 77%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
index a50f1a8..5da8917 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataStreamAggPrel.java
@@ -22,8 +22,9 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MetadataAggPOP;
+import org.apache.drill.exec.physical.config.MetadataStreamAggPOP;
 import org.apache.drill.exec.planner.common.DrillRelNode;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
 import org.apache.drill.exec.record.BatchSchema;
@@ -33,19 +34,21 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
+public class MetadataStreamAggPrel extends SingleRel implements DrillRelNode, Prel {
   private final MetadataAggregateContext context;
+  private final OperatorPhase phase;
 
-  public MetadataAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-      MetadataAggregateContext context) {
+  public MetadataStreamAggPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
+      MetadataAggregateContext context, OperatorPhase phase) {
     super(cluster, traits, input);
     this.context = context;
+    this.phase = phase;
   }
 
   @Override
   public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
     Prel child = (Prel) this.getInput();
-    MetadataAggPOP physicalOperator = new MetadataAggPOP(child.getPhysicalOperator(creator), context);
+    MetadataStreamAggPOP physicalOperator = new MetadataStreamAggPOP(child.getPhysicalOperator(creator), context, phase);
     return creator.addMetadata(this, physicalOperator);
   }
 
@@ -66,7 +69,7 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
 
   @Override
   public boolean needsFinalColumnReordering() {
-    return true;
+    return false;
   }
 
   @Override
@@ -77,6 +80,10 @@ public class MetadataAggPrel extends SingleRel implements DrillRelNode, Prel {
   @Override
   public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
     Preconditions.checkState(inputs.size() == 1);
-    return new MetadataAggPrel(getCluster(), traitSet, inputs.iterator().next(), context);
+    return new MetadataStreamAggPrel(getCluster(), traitSet, inputs.iterator().next(), context, phase);
+  }
+
+  public OperatorPhase getPhase() {
+    return phase;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index e6cba6b..4639bdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.planner.physical;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -37,22 +35,33 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
 
 public class PrelUtil {
 
   public static List<Ordering> getOrdering(RelCollation collation, RelDataType rowType) {
-    List<Ordering> orderExpr = Lists.newArrayList();
-
-    final List<String> childFields = rowType.getFieldNames();
-
-    for (RelFieldCollation fc : collation.getFieldCollations()) {
-      FieldReference fr = new FieldReference(childFields.get(fc.getFieldIndex()), ExpressionPosition.UNKNOWN);
-      orderExpr.add(new Ordering(fc.getDirection(), fr, fc.nullDirection));
+    List<Ordering> orderExpr = new ArrayList<>();
+
+    List<String> childFields = rowType.getFieldNames();
+    Function<RelFieldCollation, String> fieldNameProvider;
+    if (collation instanceof MetadataAggPrule.NamedRelCollation) {
+      fieldNameProvider = fieldCollation -> {
+        MetadataAggPrule.NamedRelCollation namedCollation = (MetadataAggPrule.NamedRelCollation) collation;
+        return namedCollation.getName(fieldCollation.getFieldIndex());
+      };
+    } else {
+      fieldNameProvider = fieldCollation -> childFields.get(fieldCollation.getFieldIndex());
     }
 
+    collation.getFieldCollations().forEach(fieldCollation -> {
+      FieldReference fieldReference = new FieldReference(fieldNameProvider.apply(fieldCollation), ExpressionPosition.UNKNOWN);
+      orderExpr.add(new Ordering(fieldCollation.getDirection(), fieldReference, fieldCollation.nullDirection));
+    });
+
     return orderExpr;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
index c34ee99..36eae41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.util.function.CheckedSupplier;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.metastore.analyze.AnalyzeInfoProvider;
 import org.apache.drill.exec.metastore.analyze.MetadataAggregateContext;
 import org.apache.drill.exec.metastore.analyze.MetadataControllerContext;
@@ -116,12 +117,14 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
           .build(logger);
     }
 
+    ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(context.getOptions());
+
     SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier();
     // creates select with DYNAMIC_STAR column and analyze specific columns to obtain corresponding table scan
     SqlSelect scanSql = new SqlSelect(
         SqlParserPos.ZERO,
         SqlNodeList.EMPTY,
-        getColumnList(sqlAnalyzeTable, analyzeInfoProvider),
+        getColumnList(analyzeInfoProvider.getProjectionFields(table, getMetadataType(sqlAnalyzeTable), columnNamesOptions)),
         tableIdentifier,
         null,
         null,
@@ -175,13 +178,12 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
   /**
    * Generates the column list with {@link SchemaPath#DYNAMIC_STAR} and columns required for analyze.
    */
-  private SqlNodeList getColumnList(SqlMetastoreAnalyzeTable sqlAnalyzeTable, AnalyzeInfoProvider analyzeInfoProvider) {
+  private SqlNodeList getColumnList(List<SchemaPath> projectingColumns) {
     SqlNodeList columnList = new SqlNodeList(SqlParserPos.ZERO);
     columnList.add(new SqlIdentifier(SchemaPath.DYNAMIC_STAR, SqlParserPos.ZERO));
-    MetadataType metadataLevel = getMetadataType(sqlAnalyzeTable);
-    for (SqlIdentifier field : analyzeInfoProvider.getProjectionFields(metadataLevel, context.getPlannerSettings().getOptions())) {
-      columnList.add(field);
-    }
+    projectingColumns.stream()
+        .map(segmentColumn -> new SqlIdentifier(segmentColumn.getRootSegmentPath(), SqlParserPos.ZERO))
+        .forEach(columnList::add);
     return columnList;
   }
 
@@ -219,7 +221,9 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
         .workspace(workspaceName)
         .build();
 
-    List<String> segmentColumns = analyzeInfoProvider.getSegmentColumns(table, context.getPlannerSettings().getOptions()).stream()
+    ColumnNamesOptions columnNamesOptions = new ColumnNamesOptions(context.getOptions());
+
+    List<String> segmentColumns = analyzeInfoProvider.getSegmentColumns(table, columnNamesOptions).stream()
         .map(SchemaPath::getRootSegmentPath)
         .collect(Collectors.toList());
     List<NamedExpression> segmentExpressions = segmentColumns.stream()
@@ -298,7 +302,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
           .forEach(statisticsColumns::add);
     }
 
-    SchemaPath locationField = analyzeInfoProvider.getLocationField(config.getContext().getOptions());
+    SchemaPath locationField = analyzeInfoProvider.getLocationField(columnNamesOptions);
 
     if (analyzeInfoProvider.supportsMetadataType(MetadataType.ROW_GROUP) && metadataLevel.includes(MetadataType.ROW_GROUP)) {
       MetadataHandlerContext handlerContext = MetadataHandlerContext.builder()
@@ -344,7 +348,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
             .build();
 
         convertedRelNode = getSegmentAggRelNode(segmentExpressions, convertedRelNode,
-            createNewAggregations, statisticsColumns, locationField, analyzeInfoProvider, i, handlerContext);
+            createNewAggregations, statisticsColumns, locationField, i, handlerContext);
 
         locationField = SchemaPath.getSimplePath(MetastoreAnalyzeConstants.LOCATION_FIELD);
 
@@ -409,6 +413,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
         .interestingColumns(statisticsColumns)
         .createNewAggregations(createNewAggregations)
         .excludedColumns(excludedColumns)
+        .metadataLevel(MetadataType.TABLE)
         .build();
 
     convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -424,22 +429,20 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
 
   private DrillRel getSegmentAggRelNode(List<NamedExpression> segmentExpressions, DrillRel convertedRelNode,
       boolean createNewAggregations, List<SchemaPath> statisticsColumns, SchemaPath locationField,
-      AnalyzeInfoProvider analyzeInfoProvider, int segmentLevel, MetadataHandlerContext handlerContext) {
-      SchemaPath lastModifiedTimeField =
+      int segmentLevel, MetadataHandlerContext handlerContext) {
+    SchemaPath lastModifiedTimeField =
         SchemaPath.getSimplePath(config.getContext().getOptions().getString(ExecConstants.IMPLICIT_LAST_MODIFIED_TIME_COLUMN_LABEL));
 
     List<SchemaPath> excludedColumns = Arrays.asList(lastModifiedTimeField, locationField);
 
-    List<NamedExpression> groupByExpressions = new ArrayList<>();
-    groupByExpressions.add(analyzeInfoProvider.getParentLocationExpression(locationField));
-
-    groupByExpressions.addAll(segmentExpressions);
+    List<NamedExpression> groupByExpressions = new ArrayList<>(segmentExpressions);
 
     MetadataAggregateContext aggregateContext = MetadataAggregateContext.builder()
-        .groupByExpressions(groupByExpressions.subList(0, segmentLevel + 1))
+        .groupByExpressions(groupByExpressions.subList(0, segmentLevel))
         .interestingColumns(statisticsColumns)
         .createNewAggregations(createNewAggregations)
         .excludedColumns(excludedColumns)
+        .metadataLevel(MetadataType.SEGMENT)
         .build();
 
     convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -470,6 +473,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
         .interestingColumns(statisticsColumns)
         .createNewAggregations(createNewAggregations)
         .excludedColumns(excludedColumns)
+        .metadataLevel(MetadataType.FILE)
         .build();
 
     convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -508,6 +512,7 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
         .interestingColumns(statisticsColumns)
         .createNewAggregations(createNewAggregations)
         .excludedColumns(excludedColumns)
+        .metadataLevel(MetadataType.ROW_GROUP)
         .build();
 
     convertedRelNode = new MetadataAggRel(convertedRelNode.getCluster(),
@@ -525,11 +530,10 @@ public class MetastoreAnalyzeTableHandler extends DefaultSqlHandler {
       SchemaPath locationField, String rowGroupIndexColumn, SchemaPath rgiField) {
     List<NamedExpression> rowGroupGroupByExpressions = new ArrayList<>(segmentExpressions);
     rowGroupGroupByExpressions.add(
+        new NamedExpression(locationField, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
+    rowGroupGroupByExpressions.add(
         new NamedExpression(rgiField,
             FieldReference.getWithQuotedRef(rowGroupIndexColumn)));
-
-    rowGroupGroupByExpressions.add(
-        new NamedExpression(locationField, FieldReference.getWithQuotedRef(MetastoreAnalyzeConstants.LOCATION_FIELD)));
     return rowGroupGroupByExpressions;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index 2b2dd74..d4c45ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.metastore.ColumnNamesOptions;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -156,7 +157,7 @@ public class ColumnExplorer {
    * @param path column path
    * @return true if given column is partition, false otherwise
    */
-  public static boolean isPartitionColumn(String partitionDesignator, String path){
+  public static boolean isPartitionColumn(String partitionDesignator, String path) {
     Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
     Matcher matcher = pattern.matcher(path);
     return matcher.matches();
@@ -164,7 +165,18 @@ public class ColumnExplorer {
 
   public boolean isImplicitColumn(String name) {
     return isPartitionColumn(partitionDesignator, name) ||
-           isImplicitFileColumn(name);
+        isImplicitOrInternalFileColumn(name);
+  }
+
+  /**
+   * Checks whether given column is implicit or internal.
+   *
+   * @param name name of the column to check
+   * @return {@code true} if given column is implicit or internal, {@code false} otherwise
+   */
+  public boolean isImplicitOrInternalFileColumn(String name) {
+    return allImplicitColumns.get(name) != null
+        || allInternalColumns.get(name) != null;
   }
 
   public boolean isImplicitFileColumn(String name) {
@@ -191,14 +203,13 @@ public class ColumnExplorer {
    * Returns list with partition column names.
    * For the case when table has several levels of nesting, max level is chosen.
    *
-   * @param selection     the source of file paths
-   * @param optionManager the source of session option value for partition column label
+   * @param selection          the source of file paths
+   * @param columnNamesOptions the source of session option value for partition column label
    * @return list with partition column names.
    */
-  public static List<String> getPartitionColumnNames(FileSelection selection, OptionManager optionManager) {
+  public static List<String> getPartitionColumnNames(FileSelection selection, ColumnNamesOptions columnNamesOptions) {
 
-    String partitionColumnLabel = optionManager.getString(
-        ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+    String partitionColumnLabel = columnNamesOptions.partitionColumnNameLabel();
 
     return getPartitionColumnNames(selection, partitionColumnLabel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 50112ab..a9bff98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -31,6 +32,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.drill.common.expression.ExpressionStringBuilder;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -72,7 +74,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMetadata {
+public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMetadata<ParquetMetadataProvider> {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetGroupScan.class);
 
@@ -140,6 +142,9 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
    */
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
+    if (endpointAffinities == null) {
+      this.endpointAffinities = AffinityCreator.getAffinityMap(getRowGroupInfos());
+    }
     return endpointAffinities;
   }
 
@@ -369,11 +374,11 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
 
     return getFilterer()
         .rowGroups(prunedRowGroups)
-        .table(getTableMetadata())
-        .partitions(getPartitionsMetadata())
-        .segments(getSegmentsMetadata())
+        .table(tableMetadata)
+        .partitions(partitions)
+        .segments(segments)
         .files(qualifiedFiles)
-        .nonInterestingColumns(getNonInterestingColumnsMetadata())
+        .nonInterestingColumns(nonInterestingColumnsMetadata)
         .matching(matchAllMetadata)
         .build();
   }
@@ -433,17 +438,9 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
   }
 
   // protected methods block
-  @Override
-  protected void init() throws IOException {
-    super.init();
-
-    this.partitionColumns = metadataProvider.getPartitionColumns();
-    this.endpointAffinities = AffinityCreator.getAffinityMap(getRowGroupInfos());
-  }
-
   protected Multimap<Path, RowGroupMetadata> getRowGroupsMetadata() {
     if (rowGroups == null) {
-      rowGroups = ((ParquetMetadataProvider) metadataProvider).getRowGroupsMetadataMap();
+      rowGroups = metadataProvider.getRowGroupsMetadataMap();
     }
     return rowGroups;
   }
@@ -534,8 +531,6 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
         newScan.fileSet = new HashSet<>(newScan.getRowGroupsMetadata().keySet());
       }
 
-      newScan.endpointAffinities = AffinityCreator.getAffinityMap(newScan.getRowGroupInfos());
-
       return newScan;
     }
 
@@ -593,6 +588,16 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
 
         this.rowGroups = LinkedListMultimap.create();
         filteredRowGroups.forEach(entry -> this.rowGroups.put(entry.getPath(), entry));
+        // updates files list to include only present row groups
+        if (MapUtils.isNotEmpty(files)) {
+          files = rowGroups.keySet().stream()
+              .map(files::get)
+              .collect(Collectors.toMap(
+                  FileMetadata::getPath,
+                  Function.identity(),
+                  (o, n) -> n,
+                  LinkedHashMap::new));
+        }
       } else {
         this.rowGroups = prunedRowGroups;
         matchAllMetadata = false;
@@ -601,6 +606,67 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
     }
 
     /**
+     * Produces filtering of metadata at file level.
+     *
+     * @param optionManager     option manager
+     * @param filterPredicate   filter expression
+     * @param schemaPathsInExpr columns used in filter expression
+     */
+    @Override
+    protected void filterFileMetadata(OptionManager optionManager,
+        FilterPredicate<?> filterPredicate,
+        Set<SchemaPath> schemaPathsInExpr) {
+      Map<Path, FileMetadata> prunedFiles;
+      if (!source.getPartitionsMetadata().isEmpty()
+          && source.getPartitionsMetadata().size() > getPartitions().size()) {
+        // prunes files to leave only files which are contained by pruned partitions
+        prunedFiles = pruneForPartitions(source.getFilesMetadata(), getPartitions());
+      } else if (!source.getSegmentsMetadata().isEmpty()
+          && source.getSegmentsMetadata().size() > getSegments().size()) {
+        // prunes row groups to leave only row groups which are contained by pruned segments
+        prunedFiles = pruneForSegments(source.getFilesMetadata(), getSegments());
+      } else {
+        prunedFiles = source.getFilesMetadata();
+      }
+
+      if (isMatchAllMetadata()) {
+        files = prunedFiles;
+        return;
+      }
+
+      // files which have only single row group may be pruned when pruning row groups
+      Map<Path, FileMetadata> omittedFiles = new HashMap<>();
+
+      AbstractParquetGroupScan abstractParquetGroupScan = (AbstractParquetGroupScan) source;
+
+      Map<Path, FileMetadata> filesToFilter = new HashMap<>(prunedFiles);
+      if (!abstractParquetGroupScan.rowGroups.isEmpty()) {
+        prunedFiles.forEach((path, fileMetadata) -> {
+          if (abstractParquetGroupScan.rowGroups.get(path).size() == 1) {
+            omittedFiles.put(path, fileMetadata);
+            filesToFilter.remove(path);
+          }
+        });
+      }
+
+      // Stop files pruning for the case:
+      //    -  # of files is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      if (filesToFilter.size() <= optionManager.getOption(
+          PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+
+        matchAllMetadata = true;
+        files = filterAndGetMetadata(schemaPathsInExpr, filesToFilter.values(), filterPredicate, optionManager).stream()
+            .collect(Collectors.toMap(FileMetadata::getPath, Function.identity()));
+
+        files.putAll(omittedFiles);
+      } else {
+        matchAllMetadata = false;
+        files = prunedFiles;
+        overflowLevel = MetadataType.FILE;
+      }
+    }
+
+    /**
      * Removes metadata which does not belong to any of segments in metadata list.
      *
      * @param metadataToPrune         list of metadata which should be pruned
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
index 4df69d1..cc93873 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileTableMetadataProviderBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.exec.metastore.ParquetTableMetadataProvider;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.metastore.metadata.TableMetadataProviderBuilder;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
@@ -32,6 +33,9 @@ import java.util.List;
  */
 public interface ParquetFileTableMetadataProviderBuilder extends TableMetadataProviderBuilder {
 
+  @Override
+  ParquetFileTableMetadataProviderBuilder withSchema(TupleMetadata schema);
+
   ParquetFileTableMetadataProviderBuilder withEntries(List<ReadEntryWithPath> entries);
 
   ParquetFileTableMetadataProviderBuilder withSelectionRoot(Path selectionRoot);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
index 6765a20..a9ee538 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -29,7 +29,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.util.StdConverter;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -93,7 +92,7 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
    * An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
    * to DynamicPojoRecordReader during physical plan fragment deserialization.
    */
-  public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader> {
+  public static class Converter<T> extends StdConverter<JsonNode, DynamicPojoRecordReader<T>> {
     private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
         new TypeReference<LinkedHashMap<String, Class<?>>>() {};
 
@@ -105,16 +104,22 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
     }
 
     @Override
-    public DynamicPojoRecordReader convert(JsonNode value) {
-      LinkedHashMap<String, Class<?>> schema = mapper.convertValue(value.get("schema"), schemaType);
-
-      ArrayList records = new ArrayList(schema.size());
-      final Iterator<JsonNode> recordsIterator = value.get("records").get(0).elements();
-      for (Class<?> fieldType : schema.values()) {
-        records.add(mapper.convertValue(recordsIterator.next(), fieldType));
+    public DynamicPojoRecordReader<T> convert(JsonNode value) {
+      LinkedHashMap<String, Class<T>> schema = mapper.convertValue(value.get("schema"), schemaType);
+      List<List<T>> records = new ArrayList<>();
+
+      JsonNode serializedRecords = value.get("records");
+      for (JsonNode serializedRecord : serializedRecords) {
+        List<T> record = new ArrayList<>(schema.size());
+        Iterator<JsonNode> recordsIterator = serializedRecord.elements();
+        for (Class<T> fieldType : schema.values()) {
+          record.add(mapper.convertValue(recordsIterator.next(), fieldType));
+        }
+        records.add(record);
       }
+
       int maxRecordsToRead = value.get("recordsPerBatch").asInt();
-      return new DynamicPojoRecordReader(schema, Collections.singletonList(records), maxRecordsToRead);
+      return new DynamicPojoRecordReader(schema, records, maxRecordsToRead);
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
index 38b0ae5..6b703c7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunction.java
@@ -85,7 +85,7 @@ public class TestAggregateFunction extends PopUnitTestBase {
   public void testCovarianceCorrelation() throws Throwable {
     String planPath = "/functions/test_covariance.json";
     String dataPath = "/covariance_input.json";
-    Double expectedValues[] = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d, 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d};
+    Double[] expectedValues = {4.571428571428571d, 4.857142857142857d, -6.000000000000002d, 4.0d, 4.25d, -5.250000000000002d, 1.0d, 0.9274260335029677d, -1.0000000000000004d};
 
     runTest(expectedValues, planPath, dataPath);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index f79757d..fcfd3fe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1056,66 +1056,172 @@ public class TestAggregateFunctions extends ClusterTest {
   }
 
   @Test
-  public void testCollectList() throws Exception {
-    testBuilder()
-        .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
-            "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from (select * from cp.`tpch/nation.parquet` limit 2)")
-        .unOrdered()
-        .baselineColumns("l")
-        .baselineValues(listOf(
-            mapOf("n_nationkey", 0, "n_name", "ALGERIA",
-                "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
-            mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
-                "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
-        .go();
+  public void testCollectListStreamAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+      testBuilder()
+          .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
+              "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l " +
+              "from (select * from cp.`tpch/nation.parquet` limit 2)")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(listOf(
+              mapOf("n_nationkey", 0, "n_name", "ALGERIA",
+                  "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
+              mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
+                  "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+    }
   }
 
   @Test
-  public void testCollectToListVarchar() throws Exception {
-    testBuilder()
-        .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
-            "(select * from cp.`store/json/clicks.json` limit 2)")
-        .unOrdered()
-        .baselineColumns("l")
-        .baselineValues(listOf("2014-04-26", "2014-04-20"))
-        .go();
+  public void testCollectListHashAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+      testBuilder()
+          .sqlQuery("select collect_list('n_nationkey', n_nationkey, " +
+              "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l " +
+              "from (select * from cp.`tpch/nation.parquet` limit 2) group by 'a'")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(listOf(
+              mapOf("n_nationkey", 0, "n_name", "ALGERIA",
+                  "n_regionkey", 0, "n_comment", " haggle. carefully final deposits detect slyly agai"),
+              mapOf("n_nationkey", 1, "n_name", "ARGENTINA", "n_regionkey", 1,
+                  "n_comment", "al foxes promise slyly according to the regular accounts. bold requests alon")))
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+    }
   }
 
   @Test
-  public void testSchemaFunction() throws Exception {
-    TupleMetadata schema = new SchemaBuilder()
-        .add("n_nationkey", TypeProtos.MinorType.INT)
-        .add("n_name", TypeProtos.MinorType.VARCHAR)
-        .add("n_regionkey", TypeProtos.MinorType.INT)
-        .add("n_comment", TypeProtos.MinorType.VARCHAR)
-        .build();
+  public void testCollectToListVarcharStreamAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+      testBuilder()
+          .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
+              "(select * from cp.`store/json/clicks.json` limit 2)")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(listOf("2014-04-26", "2014-04-20"))
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+    }
+  }
 
-    testBuilder()
-        .sqlQuery("select schema('n_nationkey', n_nationkey, " +
-            "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
-            "(select * from cp.`tpch/nation.parquet` limit 2)")
-        .unOrdered()
-        .baselineColumns("l")
-        .baselineValues(schema.jsonString())
-        .go();
+  @Test
+  public void testCollectToListVarcharHashAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+      testBuilder()
+          .sqlQuery("select collect_to_list_varchar(`date`) as l from " +
+              "(select * from cp.`store/json/clicks.json` limit 2) group by 'a'")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(listOf("2014-04-26", "2014-04-20"))
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+    }
   }
 
   @Test
-  public void testMergeSchemaFunction() throws Exception {
-    String schema = new SchemaBuilder()
-        .add("n_nationkey", TypeProtos.MinorType.INT)
-        .add("n_name", TypeProtos.MinorType.VARCHAR)
-        .add("n_regionkey", TypeProtos.MinorType.INT)
-        .add("n_comment", TypeProtos.MinorType.VARCHAR)
-        .build()
-        .jsonString();
+  public void testSchemaFunctionStreamAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+      TupleMetadata schema = new SchemaBuilder()
+          .add("n_nationkey", TypeProtos.MinorType.INT)
+          .add("n_name", TypeProtos.MinorType.VARCHAR)
+          .add("n_regionkey", TypeProtos.MinorType.INT)
+          .add("n_comment", TypeProtos.MinorType.VARCHAR)
+          .build();
 
-    testBuilder()
-        .sqlQuery("select merge_schema('%s') as l from " +
-            "(select * from cp.`tpch/nation.parquet` limit 2)", schema)
-        .unOrdered()
-        .baselineColumns("l")
-        .baselineValues(schema)
-        .go();
+      testBuilder()
+          .sqlQuery("select schema('n_nationkey', n_nationkey, " +
+              "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
+              "(select * from cp.`tpch/nation.parquet` limit 2)")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(schema.jsonString())
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+    }
+  }
+
+  @Test
+  public void testSchemaFunctionHashAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+      TupleMetadata schema = new SchemaBuilder()
+          .add("n_nationkey", TypeProtos.MinorType.INT)
+          .add("n_name", TypeProtos.MinorType.VARCHAR)
+          .add("n_regionkey", TypeProtos.MinorType.INT)
+          .add("n_comment", TypeProtos.MinorType.VARCHAR)
+          .build();
+
+      testBuilder()
+          .sqlQuery("select schema('n_nationkey', n_nationkey, " +
+              "'n_name', n_name, 'n_regionkey', n_regionkey, 'n_comment', n_comment) as l from " +
+              "(select * from cp.`tpch/nation.parquet` limit 2) group by 'a'")
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(schema.jsonString())
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+    }
+  }
+
+  @Test
+  public void testMergeSchemaFunctionStreamAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+      String schema = new SchemaBuilder()
+          .add("n_nationkey", TypeProtos.MinorType.INT)
+          .add("n_name", TypeProtos.MinorType.VARCHAR)
+          .add("n_regionkey", TypeProtos.MinorType.INT)
+          .add("n_comment", TypeProtos.MinorType.VARCHAR)
+          .build()
+          .jsonString();
+
+      testBuilder()
+          .sqlQuery("select merge_schema('%s') as l from " +
+              "(select * from cp.`tpch/nation.parquet` limit 2)", schema)
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(schema)
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+    }
+  }
+
+  @Test
+  public void testMergeSchemaFunctionHashAgg() throws Exception {
+    try {
+      client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+      String schema = new SchemaBuilder()
+          .add("n_nationkey", TypeProtos.MinorType.INT)
+          .add("n_name", TypeProtos.MinorType.VARCHAR)
+          .add("n_regionkey", TypeProtos.MinorType.INT)
+          .add("n_comment", TypeProtos.MinorType.VARCHAR)
+          .build()
+          .jsonString();
+
+      testBuilder()
+          .sqlQuery("select merge_schema('%s') as l from " +
+              "(select * from cp.`tpch/nation.parquet` limit 2) group by 'a'", schema)
+          .unOrdered()
+          .baselineColumns("l")
+          .baselineValues(schema)
+          .go();
+    } finally {
+      client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+    }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
index 850b49d..a1cd2e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAggWithAnyValue.java
@@ -18,22 +18,29 @@
 
 package org.apache.drill.exec.physical.impl.agg;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.OperatorTest;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.drill.test.TestBuilder;
 import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.drill.test.TestBuilder.listOf;
+import static org.apache.drill.test.TestBuilder.mapOf;
+
 @Category(OperatorTest.class)
 @RunWith(Enclosed.class)
 public class TestAggWithAnyValue {
@@ -43,7 +50,7 @@ public class TestAggWithAnyValue {
     @Test
     public void testStreamAggWithGroupBy() {
       StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("age.`max`", "age"), parseExprs("any_value(a)", "any_a"));
-      List<String> inputJsonBatches = Lists.newArrayList(
+      List<String> inputJsonBatches = Arrays.asList(
           "[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," +
               " \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," +
               "\"p\": {\"q\": [21, 22, 23]}" + "}, " +
@@ -63,87 +70,258 @@ public class TestAggWithAnyValue {
           .physicalOperator(aggConf)
           .inputDataStreamJson(inputJsonBatches)
           .baselineColumns("age", "any_a")
-          .baselineValues(60l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)))
-          .baselineValues(80l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)))
+          .baselineValues(60L,
+              listOf(
+                  mapOf("b", 50L, "c", 30L),
+                  mapOf("b", 70L, "c", 40L)))
+          .baselineValues(80L,
+              listOf(
+                  mapOf("b", 10L, "c", 15L),
+                  mapOf("b", 20L, "c", 45L)))
+          .go();
+    }
+
+    @Test
+    public void testHashAggWithGroupBy() {
+      HashAggregate aggConf = new HashAggregate(null,
+          OperatorPhase.PHASE_1of1,
+          parseExprs("age.`max`", "age"),
+          parseExprs("any_value(a)", "any_a"),
+          1F);
+      List<String> inputJsonBatches = Arrays.asList(
+          "[{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"San Bruno\", \"de\": \"987654321987654321987654321.10987654321\"," +
+              " \"a\": [{\"b\":50, \"c\":30},{\"b\":70, \"c\":40}], \"m\": [{\"n\": [10, 11, 12]}], \"f\": [{\"g\": {\"h\": [{\"k\": 70}, {\"k\": 80}]}}]," +
+              "\"p\": {\"q\": [21, 22, 23]}}, " +
+              "{ \"age\": {\"min\":20, \"max\":60}, \"city\": \"Castro Valley\", \"de\": \"987654321987654321987654321.12987654321\"," +
+              " \"a\": [{\"b\":60, \"c\":40},{\"b\":80, \"c\":50}], \"m\": [{\"n\": [13, 14, 15]}], \"f\": [{\"g\": {\"h\": [{\"k\": 90}, {\"k\": 100}]}}]," +
+              "\"p\": {\"q\": [24, 25, 26]}}]",
+          "[{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.00987654321\"," +
+              " \"a\": [{\"b\":10, \"c\":15}, {\"b\":20, \"c\":45}], \"m\": [{\"n\": [1, 2, 3]}], \"f\": [{\"g\": {\"h\": [{\"k\": 10}, {\"k\": 20}]}}]," +
+              "\"p\": {\"q\": [27, 28, 29]}}, " +
+              "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"San Carlos\", \"de\": \"987654321987654321987654321.11987654321\"," +
+              " \"a\": [{\"b\":30, \"c\":25}, {\"b\":40, \"c\":55}], \"m\": [{\"n\": [4, 5, 6]}], \"f\": [{\"g\": {\"h\": [{\"k\": 30}, {\"k\": 40}]}}]," +
+              "\"p\": {\"q\": [30, 31, 32]}}, " +
+              "{ \"age\": {\"min\":43, \"max\":80}, \"city\": \"Palo Alto\", \"de\": \"987654321987654321987654321.13987654321\"," +
+              " \"a\": [{\"b\":70, \"c\":85}, {\"b\":90, \"c\":145}], \"m\": [{\"n\": [7, 8, 9]}], \"f\": [{\"g\": {\"h\": [{\"k\": 50}, {\"k\": 60}]}}]," +
+              "\"p\": {\"q\": [33, 34, 35]}}]");
+      legacyOpTestBuilder()
+          .physicalOperator(aggConf)
+          .inputDataStreamJson(inputJsonBatches)
+          .baselineColumns("age", "any_a")
+          .baselineValues(60L,
+              listOf(
+                  mapOf("b", 50L, "c", 30L),
+                  mapOf("b", 70L, "c", 40L)))
+          .baselineValues(80L,
+              listOf(
+                  mapOf("b", 10L, "c", 15L),
+                  mapOf("b", 20L, "c", 45L)))
           .go();
     }
   }
 
-  public static class TestAggWithAnyValueSingleBatch extends BaseTestQuery {
+  public static class TestAggWithAnyValueSingleBatch extends ClusterTest {
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+      startCluster(ClusterFixture.builder(dirTestWatcher));
+    }
 
     @Test
-    public void testWithGroupBy() throws Exception {
-      String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
-          "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from  cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
-      testBuilder()
-          .sqlQuery(query)
-          .unOrdered()
-          .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
-          .baselineValues(60l, 2l, TestBuilder.listOf(TestBuilder.mapOf("b", 50l, "c", 30l), TestBuilder.mapOf("b", 70l, "c", 40l)), "San Bruno",
-              TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 70l), TestBuilder.mapOf("k", 80l))))),
-              TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(10l, 11l, 12l))),
-              TestBuilder.mapOf("q", TestBuilder.listOf(21l, 22l, 23l)))
-          .baselineValues(80l, 3l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
-              TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
-              TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
-              TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
-          .go();
+    public void testWithGroupByStreamAgg() throws Exception {
+      String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a," +
+          "any_value(t1.city) as any_city, any_value(f) as any_f, any_value(m) as any_m," +
+          "any_value(p) as any_p from  cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+
+      try {
+        client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+            .baselineValues(60L, 2L,
+                listOf(
+                    mapOf("b", 50L, "c", 30L),
+                    mapOf("b", 70L, "c", 40L)),
+                "San Bruno",
+                listOf(
+                    mapOf("g",
+                        mapOf("h",
+                            listOf(mapOf("k", 70L), mapOf("k", 80L))))),
+                listOf(mapOf("n", listOf(10L, 11L, 12L))),
+                mapOf("q", listOf(21L, 22L, 23L)))
+            .baselineValues(80L, 3L,
+                listOf(
+                    mapOf("b", 10L, "c", 15L),
+                    mapOf("b", 20L, "c", 45L)),
+                "Palo Alto",
+                listOf(mapOf("g",
+                    mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+                listOf(mapOf("n", listOf(1L, 2L, 3L))),
+                mapOf("q", listOf(27L, 28L, 29L)))
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+      }
+    }
+
+    @Test
+    public void testWithGroupByHashAgg() throws Exception {
+      String query = "select t1.age.`max` as age, count(*) as cnt, any_value(t1.a) as any_a," +
+          "any_value(t1.city) as any_city, any_value(f) as any_f, any_value(m) as any_m," +
+          "any_value(p) as any_p from  cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+      try {
+        client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("age", "cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
+            .baselineValues(60L, 2L,
+                listOf(
+                    mapOf("b", 50L, "c", 30L),
+                    mapOf("b", 70L, "c", 40L)),
+                "San Bruno",
+                listOf(
+                    mapOf("g",
+                        mapOf("h",
+                            listOf(mapOf("k", 70L), mapOf("k", 80L))))),
+                listOf(mapOf("n", listOf(10L, 11L, 12L))),
+                mapOf("q", listOf(21L, 22L, 23L)))
+            .baselineValues(80L, 3L,
+                listOf(
+                    mapOf("b", 10L, "c", 15L),
+                    mapOf("b", 20L, "c", 45L)),
+                "Palo Alto",
+                listOf(mapOf("g",
+                    mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+                listOf(mapOf("n", listOf(1L, 2L, 3L))),
+                mapOf("q", listOf(27L, 28L, 29L)))
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+      }
     }
 
     @Test
     public void testWithoutGroupBy() throws Exception {
       String query = "select count(*) as cnt, any_value(t1.a) as any_a, any_value(t1.city) as any_city, " +
-          "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p from  cp.`store/json/test_anyvalue.json` t1";
+          "any_value(f) as any_f, any_value(m) as any_m, any_value(p) as any_p " +
+          "from cp.`store/json/test_anyvalue.json` t1";
       testBuilder()
           .sqlQuery(query)
           .unOrdered()
           .baselineColumns("cnt", "any_a", "any_city", "any_f", "any_m", "any_p")
-          .baselineValues(5l, TestBuilder.listOf(TestBuilder.mapOf("b", 10l, "c", 15l), TestBuilder.mapOf("b", 20l, "c", 45l)), "Palo Alto",
-              TestBuilder.listOf(TestBuilder.mapOf("g", TestBuilder.mapOf("h", TestBuilder.listOf(TestBuilder.mapOf("k", 10l), TestBuilder.mapOf("k", 20l))))),
-              TestBuilder.listOf(TestBuilder.mapOf("n", TestBuilder.listOf(1l, 2l, 3l))),
-              TestBuilder.mapOf("q", TestBuilder.listOf(27l, 28l, 29l)))
+          .baselineValues(5L,
+              listOf(
+                  mapOf("b", 10L, "c", 15L),
+                  mapOf("b", 20L, "c", 45L)),
+              "Palo Alto",
+              listOf(mapOf("g", mapOf("h", listOf(mapOf("k", 10L), mapOf("k", 20L))))),
+              listOf(mapOf("n", listOf(1L, 2L, 3L))),
+              mapOf("q", listOf(27L, 28L, 29L)))
           .go();
     }
 
     @Test
-    public void testDecimalWithGroupBy() throws Exception {
-      String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
-          "from  cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
-      testBuilder()
-          .sqlQuery(query)
-          .unOrdered()
-          .baselineColumns("age", "any_decimal")
-          .baselineValues(60l, new BigDecimal("987654321987654321987654321.10987654321"))
-          .baselineValues(80l, new BigDecimal("987654321987654321987654321.00987654321"))
-          .go();
+    public void testDecimalWithGroupByStreamAgg() throws Exception {
+      try {
+        client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+        String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
+            "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("age", "any_decimal")
+            .baselineValues(60L, new BigDecimal("987654321987654321987654321.10987654321"))
+            .baselineValues(80L, new BigDecimal("987654321987654321987654321.00987654321"))
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+      }
     }
 
     @Test
-    public void testRepeatedDecimalWithGroupBy() throws Exception {
-      JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
-      ints.add(new BigDecimal("999999.999"));
-      ints.add(new BigDecimal("-999999.999"));
-      ints.add(new BigDecimal("0.000"));
-
-      JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
-      longs.add(new BigDecimal("999999999.999999999"));
-      longs.add(new BigDecimal("-999999999.999999999"));
-      longs.add(new BigDecimal("0.000000000"));
-
-      JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
-      fixedLen.add(new BigDecimal("999999999999.999999"));
-      fixedLen.add(new BigDecimal("-999999999999.999999"));
-      fixedLen.add(new BigDecimal("0.000000"));
-
-      String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
-          " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
-          " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`";
-      testBuilder()
-          .sqlQuery(query)
-          .unOrdered()
-          .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
-          .baselineValues(ints, longs, fixedLen, fixedLen)
-          .go();
+    public void testDecimalWithGroupByHashAgg() throws Exception {
+      try {
+        client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+        String query = "select t1.age.`max` as age, any_value(cast(t1.de as decimal(38, 11))) as any_decimal " +
+            "from cp.`store/json/test_anyvalue.json` t1 group by t1.age.`max`";
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("age", "any_decimal")
+            .baselineValues(60L, new BigDecimal("987654321987654321987654321.10987654321"))
+            .baselineValues(80L, new BigDecimal("987654321987654321987654321.00987654321"))
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+      }
+    }
+
+    @Test
+    public void testRepeatedDecimalWithGroupByStreamAgg() throws Exception {
+      try {
+        client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
+        JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+        ints.add(new BigDecimal("999999.999"));
+        ints.add(new BigDecimal("-999999.999"));
+        ints.add(new BigDecimal("0.000"));
+
+        JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+        longs.add(new BigDecimal("999999999.999999999"));
+        longs.add(new BigDecimal("-999999999.999999999"));
+        longs.add(new BigDecimal("0.000000000"));
+
+        JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+        fixedLen.add(new BigDecimal("999999999999.999999"));
+        fixedLen.add(new BigDecimal("-999999999999.999999"));
+        fixedLen.add(new BigDecimal("0.000000"));
+
+        String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
+            " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
+            " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet` group by 'a'";
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
+            .baselineValues(ints, longs, fixedLen, fixedLen)
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.HASHAGG.getOptionName());
+      }
+    }
+
+    @Test
+    public void testRepeatedDecimalWithGroupByHashAgg() throws Exception {
+      try {
+        client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
+        JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+        ints.add(new BigDecimal("999999.999"));
+        ints.add(new BigDecimal("-999999.999"));
+        ints.add(new BigDecimal("0.000"));
+
+        JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+        longs.add(new BigDecimal("999999999.999999999"));
+        longs.add(new BigDecimal("-999999999.999999999"));
+        longs.add(new BigDecimal("0.000000000"));
+
+        JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+        fixedLen.add(new BigDecimal("999999999999.999999"));
+        fixedLen.add(new BigDecimal("-999999999999.999999"));
+        fixedLen.add(new BigDecimal("0.000000"));
+
+        String query = "select any_value(decimal_int32) as any_dec_32, any_value(decimal_int64) as any_dec_64," +
+            " any_value(decimal_fixedLen) as any_dec_fixed, any_value(decimal_binary) as any_dec_bin" +
+            " from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet` group by 'a'";
+        testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("any_dec_32", "any_dec_64", "any_dec_fixed", "any_dec_bin")
+            .baselineValues(ints, longs, fixedLen, fixedLen)
+            .go();
+      } finally {
+        client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
+      }
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
index fa60562..910039f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggEmitOutcome.java
@@ -42,7 +42,8 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
 
 @Category(OperatorTest.class)
 public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
@@ -73,11 +74,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    * @param outputRowCounts - expected number of rows, in each output batch
    * @param outputOutcomes - the expected output outcomes
    */
-  private void testHashAggrEmit(int inp2_1[], int inp2_2[], String inp2_3[],  // first input batch
-                                int inp3_1[], int inp3_2[], String inp3_3[],  // second input batch
-                                String exp1_1[], int exp1_2[],            // first expected
-                                String exp2_1[], int exp2_2[],            // second expected
-                                int inpRowSet[], RecordBatch.IterOutcome inpOutcomes[],  // input batches + outcomes
+  private void testHashAggrEmit(int[] inp2_1, int[] inp2_2, String[] inp2_3,  // first input batch
+                                int[] inp3_1, int[] inp3_2, String[] inp3_3,  // second input batch
+                                String[] exp1_1, int[] exp1_2,            // first expected
+                                String[] exp2_1, int[] exp2_2,            // second expected
+                                int[] inpRowSet, RecordBatch.IterOutcome[] inpOutcomes,  // input batches + outcomes
                                 List<Integer> outputRowCounts,  // output row counts per each out batch
                                 List<RecordBatch.IterOutcome> outputOutcomes) // output outcomes
   {
@@ -134,12 +135,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
         case 3: inputContainer.add(nonEmptyInputRowSet3.container());
           break;
         default:
-          assertTrue(false);
+          fail();
       }
     }
-    for (RecordBatch.IterOutcome out : inpOutcomes) {  // build the outcomes
-      inputOutcomes.add(out);
-    }
+    // build the outcomes
+    inputOutcomes.addAll(Arrays.asList(inpOutcomes));
 
     //
     //  Build the Hash Agg Batch operator
@@ -158,20 +158,21 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     //
     //  Iterate thru the next batches, and verify expected outcomes
     //
-    assertTrue( outputRowCounts.size() == outputOutcomes.size());
+    assertEquals(outputRowCounts.size(), outputOutcomes.size());
     boolean firstOne = true;
 
-    for (int ind = 0; ind < outputOutcomes.size(); ind++ ) {
+    for (int ind = 0; ind < outputOutcomes.size(); ind++) {
       RecordBatch.IterOutcome expOut = outputOutcomes.get(ind);
-      assertTrue(haBatch.next() == expOut );
-      if ( expOut == NONE ) { break; } // done
+      assertSame(expOut, haBatch.next());
+      if (expOut == NONE) {
+        break;
+      } // done
       RowSet actualRowSet = DirectRowSet.fromContainer(haBatch.getContainer());
       int expectedSize = outputRowCounts.get(ind);
       // System.out.println(expectedSize);
-      if ( 0 == expectedSize ) {
+      if (0 == expectedSize) {
         assertEquals(expectedSize, haBatch.getRecordCount());
-      }
-      else if ( firstOne ) {
+      } else if (firstOne) {
         firstOne = false;
         new RowSetComparison(expectedRowSet1).verify(actualRowSet);
       } else {
@@ -197,8 +198,8 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrWithEmptyDataSet() {
-    int inpRowSet[] = {0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA};
+    int[] inpRowSet = {0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, NONE);
@@ -213,8 +214,8 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrEmptyBatchEmitOutcome() {
-    int inpRowSet[] = {0, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT};
+    int[] inpRowSet = {0, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 0, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -230,15 +231,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrNonEmptyBatchEmitOutcome() {
-    int inp2_1[]         = { 2,       2,       13,       13,      4};
-    int inp2_2[]         = {20,      20,      130,      130,     40};
-    String inp2_3[] = {"item2", "item2", "item13", "item13", "item4"};
+    int[] inp2_1 = {2, 2, 13, 13, 4};
+    int[] inp2_2 = {20, 20, 130, 130, 40};
+    String[] inp2_3 = {"item2", "item2", "item13", "item13", "item4"};
 
-    String exp1_1[] = {"item2", "item13", "item4"};
-    int exp1_2[]    = {     44,      286,     44};
+    String[] exp1_1 = {"item2", "item13", "item4"};
+    int[] exp1_2 = {44, 286, 44};
 
-    int inpRowSet[] = {0, 2};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT};
+    int[] inpRowSet = {0, 2};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 3, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -252,15 +253,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
-    int inp2_1[]          = {2,       13,       4,       0,        0,     0};
-    int inp2_2[]         = {20,      130,      40,    2000,     1300,  4000};
-    String inp2_3[] = {"item2", "item13", "item4", "item2", "item13", "item4"};
+    int[] inp2_1 = {2, 13, 4, 0, 0, 0};
+    int[] inp2_2 = {20, 130, 40, 2000, 1300, 4000};
+    String[] inp2_3 = {"item2", "item13", "item4", "item2", "item13", "item4"};
 
-    String exp1_1[] = {"item2", "item13", "item4"};
-    int exp1_2[]       = {2022,     1443,   4044};
+    String[] exp1_1 = {"item2", "item13", "item4"};
+    int[] exp1_2 = {2022, 1443, 4044};
 
-    int inpRowSet[] = {0, 0, 2};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT};
+    int[] inpRowSet = {0, 0, 2};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 0, 3, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -274,15 +275,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
-    int inp2_1[]          = {2,       13,       4,       0,       1,        0,    1};
-    int inp2_2[]         = {20,      130,      40,       0,   11000,        0,   33000};
-    String inp2_3[] = {"item2", "item13", "item4", "item2", "item2", "item13", "item13"};
+    int[] inp2_1 = {2, 13, 4, 0, 1, 0, 1};
+    int[] inp2_2 = {20, 130, 40, 0, 11000, 0, 33000};
+    String[] inp2_3 = {"item2", "item13", "item4", "item2", "item2", "item13", "item13"};
 
-    String exp1_1[] = {"item2", "item13", "item4"};
-    int exp1_2[]      = {11023,    33144,     44};
+    String[] exp1_1 = {"item2", "item13", "item4"};
+    int[] exp1_2 = {11023, 33144, 44};
 
-    int inpRowSet[] = {0, 0, 0, 0, 2};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT};
+    int[] inpRowSet = {0, 0, 0, 0, 2};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 0, 0, 0, 3, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, EMIT, EMIT, NONE);
@@ -300,18 +301,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAgrResetsAfterFirstEmitOutcome() {
-    int inp2_1[]          = {2,       3,       3,       3,       3,   3,   3,   3,   3,   3,   3, 2 };
-    int inp2_2[]         = {20,      30,      30,      30,      30,  30,  30,  30,  30,  30,  30, 20  };
-    String inp2_3[] = {"item2", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item2"};
+    int[] inp2_1 = {2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2};
+    int[] inp2_2 = {20, 30, 30, 30, 30, 30, 30, 30, 30, 30, 30, 20};
+    String[] inp2_3 = {"item2", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item3", "item2"};
 
-    String exp1_1[] = {"item1"};
-    int exp1_2[]      = {11};
+    String[] exp1_1 = {"item1"};
+    int[] exp1_2 = {11};
 
-    String exp2_1[] = {"item2", "item3"};
-    int exp2_2[]        = {44,      330};
+    String[] exp2_1 = {"item2", "item3"};
+    int[] exp2_2 = {44, 330};
 
-    int inpRowSet[] = {1, 0, 2, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, OK, EMIT};
+    int[] inpRowSet = {1, 0, 2, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, OK, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 1, 2, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -327,11 +328,11 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
   @Test
   public void testHashAggr_NonEmptyFirst_EmptyOKEmitOutcome() {
 
-    String exp1_1[] = {"item1"};
-    int exp1_2[]      = {11};
+    String[] exp1_1 = {"item1"};
+    int[] exp1_2 = {11};
 
-    int inpRowSet[] = {1, 0, 0, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, EMIT, NONE};
+    int[] inpRowSet = {1, 0, 0, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, EMIT, NONE};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 1, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -349,18 +350,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrMultipleOutputBatch() {
-    int inp2_1[]          = {4,       2,       5,       3,       5,      4};
-    int inp2_2[]         = {40,      20,      50,      30,      50,     40};
-    String inp2_3[] = {"item4", "item2", "item5", "item3", "item5", "item4"};
+    int[] inp2_1 = {4, 2, 5, 3, 5, 4};
+    int[] inp2_2 = {40, 20, 50, 30, 50, 40};
+    String[] inp2_3 = {"item4", "item2", "item5", "item3", "item5", "item4"};
 
-    String exp1_1[] = {"item1"};
-    int exp1_2[]      = {11};
+    String[] exp1_1 = {"item1"};
+    int[] exp1_2 = {11};
 
-    String exp2_1[] = {"item4", "item2", "item5", "item3"};
-    int exp2_2[]      = {   88,      22,     110,     33};
+    String[] exp2_1 = {"item4", "item2", "item5", "item3"};
+    int[] exp2_2 = {88, 22, 110, 33};
 
-    int inpRowSet[] = {1, 0, 2};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, OK};
+    int[] inpRowSet = {1, 0, 2};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, OK};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 1, 4, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, OK, NONE);
@@ -374,18 +375,18 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrMultipleEMITOutcome() {
-    int inp2_1[]          = {2,       3};
-    int inp2_2[]         = {20,      30};
-    String inp2_3[] = {"item2", "item3"};
+    int[] inp2_1 = {2, 3};
+    int[] inp2_2 = {20, 30};
+    String[] inp2_3 = {"item2", "item3"};
 
-    String exp1_1[] = {"item1"};
-    int exp1_2[]      = {11};
+    String[] exp1_1 = {"item1"};
+    int[] exp1_2 = {11};
 
-    String exp2_1[] = {"item2", "item3"};
-    int exp2_2[]      = {   22,      33};
+    String[] exp2_1 = {"item2", "item3"};
+    int[] exp2_2 = {22, 33};
 
-    int inpRowSet[] = {1, 0, 2, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT};
+    int[] inpRowSet = {1, 0, 2, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, EMIT, EMIT, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 1, 2, 0, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, EMIT, NONE);
@@ -399,15 +400,15 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrMultipleInputToSingleOutputBatch() {
-    int inp2_1[]          = {2};
-    int inp2_2[]         = {20};
-    String inp2_3[] = {"item2"};
+    int[] inp2_1 = {2};
+    int[] inp2_2 = {20};
+    String[] inp2_3 = {"item2"};
 
-    String exp1_1[] = {"item1", "item2"};
-    int exp1_2[]      = {   11,    22};
+    String[] exp1_1 = {"item1", "item2"};
+    int[] exp1_2 = {   11,    22};
 
-    int inpRowSet[] = {1, 0, 2, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, OK, EMIT};
+    int[] inpRowSet = {1, 0, 2, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, OK, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 2, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, NONE);
@@ -421,22 +422,22 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggrMultipleInputToMultipleOutputBatch() {
-    int inp2_1[]          = {7,       2,       7,       3};
-    int inp2_2[]         = {70,      20,      70,      33};
-    String inp2_3[] = {"item7", "item1", "item7", "item3"};
+    int[] inp2_1 = {7, 2, 7, 3};
+    int[] inp2_2 = {70, 20, 70, 33};
+    String[] inp2_3 = {"item7", "item1", "item7", "item3"};
 
-    int inp3_1[]          = {17,       7,       3,       13,       9,       13};
-    int inp3_2[]         = {170,      71,      30,      130,     123,      130};
-    String inp3_3[] = {"item17", "item7", "item3", "item13", "item3", "item13"};
+    int[] inp3_1 = {17, 7, 3, 13, 9, 13};
+    int[] inp3_2 = {170, 71, 30, 130, 123, 130};
+    String[] inp3_3 = {"item17", "item7", "item3", "item13", "item3", "item13"};
 
-    String exp1_1[] = {"item1", "item7", "item3"};
-    int exp1_2[]      = {   33,     154,      36};
+    String[] exp1_1 = {"item1", "item7", "item3"};
+    int[] exp1_2 = {33, 154, 36};
 
-    String exp2_1[] = {"item17", "item7", "item3", "item13"};
-    int exp2_2[]      = {   187,      78,     165,      286};
+    String[] exp2_1 = {"item17", "item7", "item3", "item13"};
+    int[] exp2_2 = {187, 78, 165, 286};
 
-    int inpRowSet[] = {1, 0, 2, 0, 3, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, EMIT, OK, OK, EMIT};
+    int[] inpRowSet = {1, 0, 2, 0, 3, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, EMIT, OK, OK, EMIT};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 3, 4, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, EMIT, EMIT, NONE);
@@ -452,19 +453,19 @@ public class TestHashAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
    */
   @Test
   public void testHashAggr_WithEmptyNonEmptyBatchesAndOKOutcome() {
-    int inp2_1[]    = {      2,       7,       3,       13,       13,      13};
-    int inp2_2[]    = {     20,      70,      33,      130,      130,     130};
-    String inp2_3[] = {"item1", "item7", "item3", "item13", "item13", "item13"};
+    int[] inp2_1 = {2, 7, 3, 13, 13, 13};
+    int[] inp2_2 = {20, 70, 33, 130, 130, 130};
+    String[] inp2_3 = {"item1", "item7", "item3", "item13", "item13", "item13"};
 
-    int inp3_1[]    = {     17,       23,       130,        0};
-    int inp3_2[]    = {    170,      230,      1300,        0};
-    String inp3_3[] = {"item7", "item23", "item130", "item130"};
+    int[] inp3_1 = {17, 23, 130, 0};
+    int[] inp3_2 = {170, 230, 1300, 0};
+    String[] inp3_3 = {"item7", "item23", "item130", "item130"};
 
-    String exp1_1[] = {"item1", "item7", "item3", "item13", "item23", "item130"};
-    int exp1_2[]    = {     33,     264,      36,      429,      253,      1430};
+    String[] exp1_1 = {"item1", "item7", "item3", "item13", "item23", "item130"};
+    int[] exp1_2 = {33, 264, 36, 429, 253, 1430};
 
-    int inpRowSet[] = {1, 0, 2, 0, 3, 0};
-    RecordBatch.IterOutcome inpOutcomes[] = {OK_NEW_SCHEMA, OK, OK, OK, OK, OK};
+    int[] inpRowSet = {1, 0, 2, 0, 3, 0};
+    RecordBatch.IterOutcome[] inpOutcomes = {OK_NEW_SCHEMA, OK, OK, OK, OK, OK};
 
     List<Integer> outputRowCounts = Arrays.asList(0, 6, 0);
     List<RecordBatch.IterOutcome> outputOutcomes = Arrays.asList(OK_NEW_SCHEMA, OK, NONE);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index e1ece37..c039d34 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -224,7 +224,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.tmp.`%s` as\n" +
           "select * from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query = "select mykey from dfs.tmp.`%s` where mykey is null";
 
@@ -361,7 +366,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -500,7 +510,12 @@ public class TestMetastoreCommands extends ClusterTest {
           .build();
 
       try {
-        run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name());
+        testBuilder()
+            .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name())
+            .unOrdered()
+            .baselineColumns("ok", "summary")
+            .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+            .go();
 
         BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
             .getMetastoreRegistry()
@@ -530,7 +545,12 @@ public class TestMetastoreCommands extends ClusterTest {
 
     for (MetadataType analyzeLevel : analyzeLevels) {
       try {
-        run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name());
+        testBuilder()
+            .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA '%s' level", tableName, analyzeLevel.name())
+            .unOrdered()
+            .baselineColumns("ok", "summary")
+            .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+            .go();
 
         List<String> emptyMetadataLevels = Arrays.stream(MetadataType.values())
             .filter(metadataType -> metadataType.compareTo(analyzeLevel) > 0
@@ -594,7 +614,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -639,7 +664,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` columns NONE REFRESH METADATA 'row_group' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns NONE REFRESH METADATA 'row_group' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -688,7 +718,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -753,7 +788,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus) REFRESH METADATA 'row_group' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -826,7 +866,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` columns(o_orderstatus, o_orderdate) REFRESH METADATA 'row_group' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -1658,7 +1703,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA 'file' LEVEL", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.tmp.`%s` REFRESH METADATA 'file' LEVEL", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -1795,7 +1845,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -1883,7 +1938,12 @@ public class TestMetastoreCommands extends ClusterTest {
         .build();
 
     try {
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       BaseTableMetadata actualTableMetadata = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -1905,7 +1965,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.tmp.`%s`\n" +
@@ -1936,7 +2001,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -1968,20 +2038,25 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.%s (o_orderdate, o_orderpriority) partition by (o_orderpriority)\n"
           + "as select o_orderdate, o_orderpriority from dfs.`multilevel/parquet/1994/Q1`", tableName);
 
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query = "select * from dfs.%s where o_orderpriority = '1-URGENT'";
       long expectedRowCount = 3;
-      int expectedNumFiles = 1;
 
       long actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
       assertEquals(expectedRowCount, actualRowCount);
-      String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetastore=true";
 
+      // do not match expected files number since CTAS may create
+      // different files number due to small planner.slice_target value
       queryBuilder().sql(query, tableName)
           .planMatcher()
-          .include(numFilesPattern, usedMetaPattern)
+          .include(usedMetaPattern)
           .exclude("Filter")
           .match();
     } finally {
@@ -1999,20 +2074,26 @@ public class TestMetastoreCommands extends ClusterTest {
           + "as select o_orderdate, convert_to(o_orderpriority, 'UTF8') as o_orderpriority\n"
           + "from dfs.`multilevel/parquet/1994/Q1`", tableName);
 
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
+
       String query = String.format("select * from dfs.%s where o_orderpriority = '1-URGENT'", tableName);
       long expectedRowCount = 3;
-      int expectedNumFiles = 1;
 
       long actualRowCount = queryBuilder().sql(query).run().recordCount();
       assertEquals(expectedRowCount, actualRowCount);
 
-      String numFilesPattern = "numFiles=" + expectedNumFiles;
       String usedMetaPattern = "usedMetastore=true";
 
+      // do not match expected files number since CTAS may create
+      // different files number due to small planner.slice_target value
       queryBuilder().sql(query, tableName)
           .planMatcher()
-          .include(numFilesPattern, usedMetaPattern)
+          .include(usedMetaPattern)
           .exclude("Filter")
           .match();
     } finally {
@@ -2028,7 +2109,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2058,7 +2144,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2089,7 +2180,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2118,7 +2214,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query =
           "select dir0, dir1, o_custkey, o_orderdate from dfs.`%s`\n" +
@@ -2150,7 +2251,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.`%s` as select * from cp.`tpch/nation.parquet`", tableName);
       run("create table dfs.`%1$s/%1$s` as select * from cp.`tpch/nation.parquet`", tableName);
 
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query = "select * from  dfs.`%s`";
       long expectedRowCount = 50;
@@ -2174,7 +2280,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
   @Test
   public void testFieldWithDots() throws Exception {
-    String tableName = "dfs.tmp.`complex_table`";
+    String tableName = "dfs.tmp.complex_table";
     try {
       run("create table %s as\n" +
           "select cast(1 as int) as `column.with.dots`, t.`column`.`with.dots`\n" +
@@ -2191,7 +2297,12 @@ public class TestMetastoreCommands extends ClusterTest {
           .include("usedMetastore=false")
           .match();
 
-      run("analyze table %s REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
 
@@ -2208,13 +2319,18 @@ public class TestMetastoreCommands extends ClusterTest {
 
   @Test
   public void testBooleanPartitionPruning() throws Exception {
-    String tableName = "dfs.tmp.`interval_bool_partition`";
+    String tableName = "dfs.tmp.interval_bool_partition";
 
     try {
       run("create table %s partition by (col_bln) as\n" +
           "select * from cp.`parquet/alltypes_required.parquet`", tableName);
 
-      run("analyze table %s REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       String query = "select * from %s where col_bln = true";
       int expectedRowCount = 2;
@@ -2249,7 +2365,12 @@ public class TestMetastoreCommands extends ClusterTest {
           "union all\n" +
           "select col_notexist from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query = "select mykey from dfs.tmp.`t5` where mykey = 100";
       long actualRowCount = queryBuilder().sql(query, tableName).run().recordCount();
@@ -2278,7 +2399,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.tmp.`%s/b` as\n" +
           "select case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query = "select mykey from dfs.tmp.`%s` where mykey is null";
 
@@ -2308,7 +2434,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.tmp.`%s/b` as\n" +
           "select  case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query = "select mykey from dfs.tmp.`%s` where mykey is null";
 
@@ -2338,7 +2469,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.tmp.`%s/b` as\n" +
           "select case when true then 100 else null end as mykey from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` columns none REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       String query = "select mykey from dfs.tmp.`%s` where mykey is null";
 
@@ -2364,7 +2500,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
 
       String query = "select * from dfs.`%s`";
       long expectedRowCount = 120;
@@ -2387,13 +2528,18 @@ public class TestMetastoreCommands extends ClusterTest {
   }
 
   @Test
-  public void testAnalyzeWithFallbackError() throws Exception {
+  public void testAnalyzeWithDisabledFallback() throws Exception {
     String tableName = "parquetAnalyzeWithFallback";
 
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA 'file' level", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
       client.alterSession(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA, false);
 
       queryBuilder()
@@ -2414,7 +2560,12 @@ public class TestMetastoreCommands extends ClusterTest {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.default.%s]", tableName))
+          .go();
       client.alterSession(ExecConstants.METASTORE_USE_SCHEMA_METADATA, false);
 
       queryBuilder()
@@ -2438,7 +2589,12 @@ public class TestMetastoreCommands extends ClusterTest {
       client.alterSession(ExecConstants.METASTORE_USE_SCHEMA_METADATA, false);
       client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
       run("create table %s as select 'a' as c from (values(1))", table);
-      run("analyze table %s REFRESH METADATA", table);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", table)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", table))
+          .go();
 
       run("create schema (o_orderstatus varchar) for table %s", table);
 
@@ -2460,7 +2616,12 @@ public class TestMetastoreCommands extends ClusterTest {
 
       client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
 
-      run("analyze table %s REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       String query = " select employee_id from %s where department_id = 2";
 
@@ -2487,7 +2648,12 @@ public class TestMetastoreCommands extends ClusterTest {
 
       client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), false);
 
-      run("analyze table %s REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       String query = "select employee_id from %s where department_id = 2";
 
@@ -2515,7 +2681,12 @@ public class TestMetastoreCommands extends ClusterTest {
 
       client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), false);
 
-      run("analyze table %s REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table %s REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
 
@@ -2546,7 +2717,12 @@ public class TestMetastoreCommands extends ClusterTest {
 
       client.alterSession(PlannerSettings.STATISTICS_USE.getOptionName(), true);
 
-      run("ANALYZE TABLE %s COLUMNS(department_id) REFRESH METADATA COMPUTE STATISTICS SAMPLE 95 PERCENT", tableName);
+      testBuilder()
+          .sqlQuery("ANALYZE TABLE %s COLUMNS(department_id) REFRESH METADATA COMPUTE STATISTICS SAMPLE 95 PERCENT", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [%s]", tableName))
+          .go();
 
       String query = "select employee_id from %s where department_id = 2";
 
@@ -2572,7 +2748,12 @@ public class TestMetastoreCommands extends ClusterTest {
       run("create table dfs.tmp.`%s` as\n" +
           "select * from cp.`tpch/region.parquet`", tableName);
 
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       MetastoreTableInfo metastoreTableInfo = cluster.drillbit().getContext()
           .getMetastoreRegistry()
@@ -2712,7 +2893,12 @@ public class TestMetastoreCommands extends ClusterTest {
     File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       File fileToUpdate = new File(new File(new File(table, "1994"), "Q4"), "orders_94_q4.parquet");
       long lastModified = fileToUpdate.lastModified();
@@ -2756,7 +2942,12 @@ public class TestMetastoreCommands extends ClusterTest {
     File table = dirTestWatcher.copyResourceToTestTmp(Paths.get("multilevel/parquet"), Paths.get(tableName));
 
     try {
-      run("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName);
+      testBuilder()
+          .sqlQuery("analyze table dfs.tmp.`%s` REFRESH METADATA", tableName)
+          .unOrdered()
+          .baselineColumns("ok", "summary")
+          .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+          .go();
 
       dirTestWatcher.copyResourceToTestTmp(
           Paths.get("multilevel", "parquet", "1994", "Q1", "orders_94_q1.parquet"),
diff --git a/exec/java-exec/src/test/resources/functions/test_covariance.json b/exec/java-exec/src/test/resources/functions/test_covariance.json
index 0d74f70..33f9567 100644
--- a/exec/java-exec/src/test/resources/functions/test_covariance.json
+++ b/exec/java-exec/src/test/resources/functions/test_covariance.json
@@ -64,10 +64,10 @@
       "ref" : "`EXPR$7`",
       "expr" : "corr(`A`, `B`) "
     }, {
-      "ref" : "`EXPR$7`",
+      "ref" : "`EXPR$8`",
       "expr" : "corr(`A`, `C`) "
     }, {
-      "ref" : "`EXPR$8`",
+      "ref" : "`EXPR$9`",
       "expr" : "corr(`C`, `D`) "
     } ]
   }, {
@@ -75,4 +75,4 @@
     "@id" : 4,
     "child" : 3
   } ]
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/test/resources/functions/test_logical_aggr.json b/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
index f6d83b5..6e071c5 100644
--- a/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
+++ b/exec/java-exec/src/test/resources/functions/test_logical_aggr.json
@@ -61,10 +61,10 @@
       "ref" : "`EXPR$5`",
       "expr" : "bit_or(`D`) "
     }, {
-      "ref" : "`EXPR$5`",
+      "ref" : "`EXPR$6`",
       "expr" : "bool_or(`C`) "
     }, {
-      "ref" : "`EXPR$6`",
+      "ref" : "`EXPR$7`",
       "expr" : "bool_and(`C`) "
     } ]
   }, {
@@ -72,4 +72,4 @@
     "@id" : 4,
     "child" : 3
   } ]
-}
\ No newline at end of file
+}
diff --git a/exec/vector/src/main/codegen/templates/ComplexWriters.java b/exec/vector/src/main/codegen/templates/ComplexWriters.java
index 3539c62..904c825 100644
--- a/exec/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/vector/src/main/codegen/templates/ComplexWriters.java
@@ -106,7 +106,11 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
 
   public void setPosition(int idx) {
     super.setPosition(idx);
-    mutator.startNewValue(idx);
+    // calls startNewValue only for the case
+    // when it wouldn't override existing data
+    if (idx >= vector.getAccessor().getValueCount()) {
+      mutator.startNewValue(idx);
+    }
   }
   
   <#else>
diff --git a/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java b/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
index 8d4db48..30f6975 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/FunctionHolderExpression.java
@@ -95,7 +95,7 @@ public abstract class FunctionHolderExpression extends LogicalExpressionBase {
    *
    * @param fieldReference FieldReference to set.
    */
-  public void getFieldReference(FieldReference fieldReference) {
+  public void setFieldReference(FieldReference fieldReference) {
     this.fieldReference = fieldReference;
   }
 }
diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java b/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
index 8d065bb..183c9b2 100644
--- a/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
+++ b/logical/src/main/java/org/apache/drill/common/logical/data/MetadataAggregate.java
@@ -33,6 +33,6 @@ public class MetadataAggregate extends SingleInputOperator {
 
   @Override
   public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
-    throw new UnsupportedOperationException("MetadataController does not support visitors");
+    throw new UnsupportedOperationException("MetadataAggregate does not support visitors");
   }
 }