You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2019/05/03 22:46:27 UTC

[drill] 01/08: DRILL-7062: Initial implementation of run-time rowgroup pruning closes #1738

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9dc81e199cbabedbdd68acfa676a66a022b51907
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Tue Mar 26 20:11:55 2019 -0700

    DRILL-7062: Initial implementation of run-time rowgroup pruning
    closes #1738
---
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  18 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   3 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../base/AbstractGroupScanWithMetadata.java        |  43 +++-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   4 +-
 .../exec/planner/physical/PlannerSettings.java     |   6 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../exec/store/CommonParquetRecordReader.java      |  85 +++++++
 .../store/parquet/AbstractParquetGroupScan.java    |  11 +
 .../store/parquet/AbstractParquetRowGroupScan.java |  18 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 262 +++++++++++++++++----
 .../drill/exec/store/parquet/ParquetGroupScan.java |   3 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  38 ++-
 .../exec/store/parquet/ParquetReaderStats.java     |  13 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |  23 +-
 .../store/parquet/ParquetTableMetadataUtils.java   |   9 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  64 +----
 .../store/parquet/columnreaders/ReadState.java     |   7 +
 .../batchsizing/RecordBatchSizerManager.java       |   3 +-
 .../exec/store/parquet/metadata/Metadata.java      |  70 ++++--
 .../exec/store/parquet/metadata/Metadata_V4.java   |   2 +
 .../exec/store/parquet2/DrillParquetReader.java    |  25 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../store/parquet/TestParquetFilterPushDown.java   |  66 +++++-
 .../TestPushDownAndPruningWithItemStar.java        |  16 +-
 25 files changed, 596 insertions(+), 199 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index 78e107a..09533bb 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -48,6 +49,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
   private final Map<String, String> confProperties;
+  private final TupleSchema tupleSchema;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -58,7 +60,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                             @JsonProperty("confProperties") Map<String, String> confProperties,
                                             @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
-                                            @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
+                                            @JsonProperty("filter") LogicalExpression filter,
+                                            @JsonProperty("tupleScema") TupleSchema tupleSchema) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
@@ -66,7 +69,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
         hivePartitionHolder,
         confProperties,
         readerConfig,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public HiveDrillNativeParquetRowGroupScan(String userName,
@@ -76,12 +80,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             HivePartitionHolder hivePartitionHolder,
                                             Map<String, String> confProperties,
                                             ParquetReaderConfig readerConfig,
-                                            LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                                            LogicalExpression filter,
+                                            TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter,null, tupleSchema);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
     this.confProperties = confProperties;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -108,7 +114,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
@@ -119,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 0b9a463..94014fe 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.metastore.LocationProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -143,7 +144,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       subPartitionHolder.add(readEntry.getPath(), values);
     }
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, (TupleSchema) getTableMetadata().getSchema());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 24372ef..3503507 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -727,6 +727,10 @@ public final class ExecConstants {
 
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
 
+  public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning";
+  public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
+    new OptionDescription("Enables skipping the runtime pruning of the rowgroups"));
+
   public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
 
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 59d099f..15e1387 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +69,9 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+// import static org.apache.drill.exec.ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS;
+import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
+
 /**
  * Represents table group scan with metadata usage.
  */
@@ -191,12 +195,25 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     this.filter = filter;
   }
 
+  /**
+   *  Set the filter - thus enabling runtime rowgroup pruning
+   *  The runtime pruning can be disabled with an option.
+   * @param filterExpr The filter to be used at runtime to match with rowgroups' footers
+   * @param optimizerContext The context for the options
+   */
+  public void setFilterForRuntime(LogicalExpression filterExpr, OptimizerRulesContext optimizerContext) {
+    OptionManager options = optimizerContext.getPlannerSettings().getOptions();
+    boolean skipRuntimePruning = options.getBoolean(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY); // if option is set to disable runtime pruning
+    if ( ! skipRuntimePruning ) { setFilter(filterExpr); }
+  }
+
   @Override
   public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
 
     // Builds filter for pruning. If filter cannot be built, null should be returned.
-    FilterPredicate filterPredicate = getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
+    FilterPredicate filterPredicate =
+            getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
     if (filterPredicate == null) {
       logger.debug("FilterPredicate cannot be built.");
       return null;
@@ -271,6 +288,14 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
    */
   protected abstract GroupScanWithMetadataFilterer getFilterer();
 
+  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+                                                   UdfUtilities udfUtilities,
+                                                   FunctionImplementationRegistry functionImplementationRegistry,
+                                                   OptionManager optionManager,
+                                                   boolean omitUnsupportedExprs) {
+    return getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager,
+            omitUnsupportedExprs, supportsFileImplicitColumns(), (TupleSchema) getTableMetadata().getSchema());
+  }
   /**
    * Returns parquet filter predicate built from specified {@code filterExpr}.
    *
@@ -281,20 +306,19 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
    *                                       may be omitted from the resulting expression
    * @return parquet filter predicate
    */
-  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+  public static FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
                                             UdfUtilities udfUtilities,
                                             FunctionImplementationRegistry functionImplementationRegistry,
                                             OptionManager optionManager,
-                                            boolean omitUnsupportedExprs) {
-    TupleMetadata types = getSchema();
-    if (types == null) {
-      throw new UnsupportedOperationException("At least one schema source should be available.");
-    }
+                                            boolean omitUnsupportedExprs,
+                                            boolean supportsFileImplicitColumns,
+                                            TupleSchema tupleSchema) {
+    TupleMetadata types = tupleSchema.copy();
 
     Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new FilterEvaluatorUtils.FieldReferenceFinder(), null);
 
     // adds implicit or partition columns if they weren't added before.
-    if (supportsFileImplicitColumns()) {
+    if (supportsFileImplicitColumns) {
       for (SchemaPath schemaPath : schemaPathsInExpr) {
         if (isImplicitOrPartCol(schemaPath, optionManager) && SchemaPathUtils.getColumnMetadata(schemaPath, types) == null) {
           types.add(MaterializedField.create(schemaPath.getRootSegmentPath(), Types.required(TypeProtos.MinorType.VARCHAR)));
@@ -467,7 +491,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
   protected abstract boolean supportsFileImplicitColumns();
   protected abstract List<String> getPartitionValues(LocationProvider locationProvider);
 
-  protected boolean isImplicitOrPartCol(SchemaPath schemaPath, OptionManager optionManager) {
+  public static boolean isImplicitOrPartCol(SchemaPath schemaPath, OptionManager optionManager) {
     Set<String> implicitColNames = ColumnExplorer.initImplicitFileColumns(optionManager).keySet();
     return ColumnExplorer.isPartitionColumn(optionManager, schemaPath) || implicitColNames.contains(schemaPath.getRootSegmentPath());
   }
@@ -628,7 +652,6 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
             matchAllMetadata = true;
             partitions = filterAndGetMetadata(schemaPathsInExpr, source.getPartitionsMetadata(), filterPredicate, optionManager);
           } else {
-            matchAllMetadata = false;
             overflowLevel = MetadataLevel.PARTITION;
           }
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 07e2ae5..cdb36f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -72,7 +72,7 @@ public class ScanBatch implements CloseableRecordBatch {
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
-  private Iterator<RecordReader> readers;
+  private Iterator<? extends RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
   private final Mutator mutator;
@@ -100,7 +100,7 @@ public class ScanBatch implements CloseableRecordBatch {
    *                        columns, or there is a one-to-one mapping between reader and implicitColumns.
    */
   public ScanBatch(FragmentContext context,
-                   OperatorContext oContext, List<RecordReader> readerList,
+                   OperatorContext oContext, List<? extends RecordReader> readerList,
                    List<Map<String, String>> implicitColumnList) {
     this.context = context;
     this.readers = readerList.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 0a18126..a5506d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -182,8 +182,10 @@ public class PlannerSettings implements Context{
   public static final BooleanValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING = new BooleanValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
       new OptionDescription("Enables filter pushdown optimization for Parquet files. Drill reads the file metadata, stored in the footer, to eliminate row groups based on the filter condition. Default is true. (Drill 1.9+)"));
   public static final String PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY = "planner.store.parquet.rowgroup.filter.pushdown.threshold";
-  public static final PositiveLongValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new PositiveLongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY, Long.MAX_VALUE,
-      new OptionDescription("Sets the number of row groups that a table can have. You can increase the threshold if the filter can prune many row groups. However, if this setting is too high, the filter evaluation overhead increases. Base this setting on the data set. Reduce this setting if the planning time is significant or you do not see any benefit at runtime. (Drill 1.9+)"));
+  public static final LongValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new LongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,
+      new OptionDescription("Maximal number of row groups a table can have to enable pruning by the planner. Base this setting on the data set - increasing if needed " +
+        "would add planning overhead, but may reduce execution overhead if the filter is relevant (e.g., on a sorted column, or many nulls). " +
+        "Reduce this setting if the planning time is significant or you do not see any benefit at runtime. A non-positive value disables plan time pruning."));
 
   public static final String QUOTING_IDENTIFIERS_KEY = "planner.parser.quoting_identifiers";
   public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new EnumeratedStringValidator(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6719db7..61fefe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -244,6 +244,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER),
       new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER),
+      new OptionDefinition(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING),
       new OptionDefinition(ExecConstants.MIN_READER_WIDTH),
       new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST),
       new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
new file mode 100644
index 0000000..1f6f367
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+
+public abstract class CommonParquetRecordReader extends AbstractRecordReader {
+  protected final FragmentContext fragmentContext;
+
+  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+
+  protected OperatorContext operatorContext;
+
+  protected ParquetMetadata footer;
+
+  public CommonParquetRecordReader(ParquetMetadata footer, FragmentContext fragmentContext) {
+    this.footer = footer;
+    this.fragmentContext = fragmentContext;
+  }
+
+  public void updateRowgroupsStats(long numRowgroups, long rowgroupsPruned) {
+    parquetReaderStats.numRowgroups.set(numRowgroups);
+    parquetReaderStats.rowgroupsPruned.set(rowgroupsPruned);
+  }
+
+  public enum Metric implements MetricDef {
+    NUM_ROWGROUPS,               // Number of rowgroups assigned to this minor fragment
+    ROWGROUPS_PRUNED,            // Number of rowgroups pruned out at runtime
+    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
+    NUM_DATA_PAGE_lOADS,         // Number of data pages read
+    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
+    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
+    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
+    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary pages
+    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
+    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
+    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
+    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary pages from disk
+    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from disk
+    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
+    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary pages
+    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
+    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
+    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
+    TIME_DISK_SCAN,                // Time in nanos spent in reading data from disk.
+    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed width data to value vectors
+    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting varwidth data to value vectors
+    TIME_PROCESS;                  // Time in nanos spent in processing
+
+    @Override public int metricId() {
+      return ordinal();
+    }
+  }
+
+  protected void closeStats(Logger logger, Path hadoopPath) {
+    if (parquetReaderStats != null) {
+      if ( operatorContext != null ) {
+        parquetReaderStats.update(operatorContext.getStats());
+      }
+      parquetReaderStats.logStats(logger, hadoopPath);
+      parquetReaderStats = null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 5986b08..e368bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -267,6 +267,17 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
         // no need to create new group scan with the same row group.
         return null;
       }
+
+      // Stop files pruning for the case:
+      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      if (getRowGroupsMetadata().size() >= optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+        this.rowGroups = getRowGroupsMetadata();
+        matchAllMetadata = false;
+        logger.trace("Stopping plan time pruning. Metadata has {} rowgroups, but the threshold option is set to {} rowgroups", this.rowGroups.size(),
+          optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD));
+        return null;
+      }
+
       logger.debug("All row groups have been filtered out. Add back one to get schema from scanner");
 
       Map<Path, FileMetadata> filesMap = getNextOrEmpty(getFilesMetadata().values()).stream()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 52b2baa..f05175e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -27,7 +27,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,17 +42,23 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
   protected final List<SchemaPath> columns;
   protected final ParquetReaderConfig readerConfig;
   protected final LogicalExpression filter;
+  protected final Path selectionRoot;
+  protected final TupleSchema tupleSchema;
 
   protected AbstractParquetRowGroupScan(String userName,
                                      List<RowGroupReadEntry> rowGroupReadEntries,
                                      List<SchemaPath> columns,
                                      ParquetReaderConfig readerConfig,
-                                     LogicalExpression filter) {
+                                     LogicalExpression filter,
+                                     Path selectionRoot,
+                                     TupleSchema tupleSchema) {
     super(userName);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
     this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
+    this.selectionRoot = selectionRoot;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -95,6 +103,14 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
     return Collections.emptyIterator();
   }
 
+  @JsonProperty
+  public Path getSelectionRoot() {
+    return selectionRoot;
+  }
+
+  @JsonProperty
+  public TupleSchema getTupleSchema() { return tupleSchema; }
+
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index b1819e6..41f52d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -17,6 +17,19 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.exec.expr.FilterPredicate;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.shaded.guava.com.google.common.base.Functions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -28,11 +41,11 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -42,10 +55,12 @@ import org.apache.parquet.hadoop.util.HadoopInputFile;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractParquetScanBatchCreator {
@@ -65,21 +80,65 @@ public abstract class AbstractParquetScanBatchCreator {
 
     // keep footers in a map to avoid re-reading them
     Map<Path, ParquetMetadata> footers = new HashMap<>();
-    List<RecordReader> readers = new LinkedList<>();
+    List<CommonParquetRecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row groups are pruned out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+    TupleSchema tupleSchema = rowGroupScan.getTupleSchema();
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      boolean doRuntimePruning = filterExpr != null && // was a filter given ?   And it is not just a "TRUE" predicate
+        ! ((filterExpr instanceof ValueExpressions.BooleanExpression) && ((ValueExpressions.BooleanExpression) filterExpr).getBoolean() );
+
+      // Runtime pruning: Avoid recomputing metadata objects for each row-group in case they use the same file
+      // by keeping the following objects computed earlier (relies on same file being in consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
+      Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
+      FilterPredicate filterPredicate = null;
+      Set<SchemaPath> schemaPathsInExpr = null;
+      Set<String> columnsInExpr = null;
+      // for debug/info logging
+      long totalPruneTime = 0;
+      long totalRowgroups = rowGroupScan.getRowGroupReadEntries().size();
+      Stopwatch pruneTimer = Stopwatch.createUnstarted();
+
+      // If pruning - Prepare the predicate and the columns before the FOR LOOP
+      if ( doRuntimePruning ) {
+        filterPredicate = AbstractGroupScanWithMetadata.getFilterPredicate(filterExpr, context,
+          (FunctionImplementationRegistry) context.getFunctionRegistry(), context.getOptions(), true,
+          true /* supports file implicit columns */,
+          tupleSchema);
+        // Extract only the relevant columns from the filter (sans implicit columns, if any)
+        schemaPathsInExpr = filterExpr.accept(new FilterEvaluatorUtils.FieldReferenceFinder(), null);
+        columnsInExpr = new HashSet<>();
+        String partitionColumnLabel = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+        for (SchemaPath path : schemaPathsInExpr) {
+          if (rowGroupScan.supportsFileImplicitColumns() &&
+            path.toString().matches(partitionColumnLabel+"\\d+")) {
+            continue;  // skip implicit columns like dir0, dir1
+          }
+          columnsInExpr.add(path.getRootSegmentPath());
+        }
+        doRuntimePruning = ! columnsInExpr.isEmpty(); // just in case: if no columns - cancel pruning
+      }
+
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
+        /*
+        Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be populated upon the first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
+
         Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
         DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
         if (!footers.containsKey(rowGroup.getPath())) {
           if (timer != null) {
             timer.start();
@@ -94,50 +153,79 @@ public abstract class AbstractParquetScanBatchCreator {
         }
         ParquetMetadata footer = footers.get(rowGroup.getPath());
 
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
-          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
-        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
-
-        boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
-        boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
-        logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", useNewReader ? "enabled" : "disabled",
-            containsComplexColumn ? "found." : "not found.");
-        RecordReader reader;
-
-        if (useNewReader || containsComplexColumn) {
-          reader = new DrillParquetReader(context,
-              footer,
-              rowGroup,
-              columnExplorer.getTableColumns(),
-              fs,
-              containsCorruptDates);
-        } else {
-          reader = new ParquetRecordReader(context,
-              rowGroup.getPath(),
-              rowGroup.getRowGroupIndex(),
-              rowGroup.getNumRecordsToRead(),
-              fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-              footer,
-              rowGroupScan.getColumns(),
-              containsCorruptDates);
-        }
+        //
+        //   If a filter is given (and it is not just "TRUE") - then use it to perform run-time pruning
+        //
+        if (doRuntimePruning) { // skip when no filter or filter is TRUE
+
+          pruneTimer.start();
+
+          int rowGroupIndex = rowGroup.getRowGroupIndex();
+          long footerRowCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+          // When starting a new file, or at the first time - Initialize the path specific metadata
+          if (!rowGroup.getPath().equals(prevRowGroupPath)) {
+            // Create a table metadata (V4)
+            tableMetadataV4 = new Metadata_V4.ParquetTableMetadata_v4();
+
+            // The file status for this file
+            FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+
+            // The file metadata (only for the columns used in the filter)
+            fileMetadataV4 = Metadata.getParquetFileMetadata_v4(tableMetadataV4, footer, fileStatus, fs, false, true, columnsInExpr, readerConfig);
+
+            prevRowGroupPath = rowGroup.getPath(); // for next time
+          }
+
+          MetadataBase.RowGroupMetadata rowGroupMetadata = fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
+
+          Map<SchemaPath, ColumnStatistics> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, rowGroupMetadata);
 
-        logger.debug("Query {} uses {}",
-            QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
-            reader.getClass().getSimpleName());
-        readers.add(reader);
+          //
+          // Perform the Run-Time Pruning - i.e. Skip this rowgroup if the match fails
+          //
+          RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount);
 
-        List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
-        Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, rowGroupScan.supportsFileImplicitColumns());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
+          // collect logging info
+          long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
+          pruneTimer.stop();
+          pruneTimer.reset();
+          totalPruneTime += timeToRead;
+          logger.trace("Run-time pruning: {} row-group {} (RG index: {} row count: {}), took {} usec", // trace each single rowgroup
+            match == RowsMatch.NONE ? "Excluded" : "Included", rowGroup.getPath(), rowGroupIndex, footerRowCount, timeToRead);
+
+          // If this rowgroup failed the match - skip it
+          if (match == RowsMatch.NONE) {
+            rowgroupsPruned++; // one more RG was pruned
+            if (firstRowGroup == null) {  // keep first RG, to be used in case all row groups are pruned
+              firstRowGroup = rowGroup;
+              firstFooter = footer;
+            }
+            continue; // This Row group does not comply with the filter - prune it out and check the next Row Group
+          }
         }
 
-      } catch (IOException e) {
-        throw new ExecutionSetupException(e);
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext, columnExplorer, readers, implicitColumns, mapWithMaxColumns, rowGroup, fs, footer, false);
+      }
+
+      // in case all row groups were pruned out - create a single reader for the first one (so that the schema could be returned)
+      if ( readers.size() == 0 && firstRowGroup != null ) {
+        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(firstRowGroup), firstRowGroup.getPath());
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext, columnExplorer, readers, implicitColumns, mapWithMaxColumns, firstRowGroup, fs,
+          firstFooter, true);
+      }
+      if ( totalPruneTime > 0 ) {
+        logger.info("Finished parquet_runtime_pruning in {} usec. Out of given {} rowgroups, {} were pruned. {}", totalPruneTime, totalRowgroups, rowgroupsPruned,
+          totalRowgroups == rowgroupsPruned ? "ALL_PRUNED !!" : "");
       }
+
+      // Update stats (same in every reader - the others would just overwrite the stats)
+      for (CommonParquetRecordReader rr : readers ) {
+          rr.updateRowgroupsStats(totalRowgroups, rowgroupsPruned);
+      }
+
+    } catch (IOException|InterruptedException e) {
+      throw new ExecutionSetupException(e);
     }
 
     // all readers should have the same number of implicit columns, add missing ones with value null
@@ -149,6 +237,78 @@ public abstract class AbstractParquetScanBatchCreator {
     return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
+  /**
+   *  Create a reader and add it to the list of readers.
+   *
+   * @param context The fragment context
+   * @param rowGroupScan RowGroup Scan
+   * @param oContext Operator context
+   * @param columnExplorer The column helper class object
+   * @param readers the readers' list where a new reader is added to
+   * @param implicitColumns the implicit columns list
+   * @param mapWithMaxColumns To be modified, in case there are implicit columns
+   * @param rowGroup create a reader for this specific row group
+   * @param fs file system
+   * @param footer this file's footer
+   * // @param readSchemaOnly - if true sets the number of rows to read to be zero
+   * @return the (possibly modified) input  mapWithMaxColumns
+   */
+  private Map<String, String> createReaderAndImplicitColumns(ExecutorFragmentContext context,
+                                                             AbstractParquetRowGroupScan rowGroupScan,
+                                                             OperatorContext oContext,
+                                                             ColumnExplorer columnExplorer,
+                                                             List<CommonParquetRecordReader> readers,
+                                                             List<Map<String, String>> implicitColumns,
+                                                             Map<String, String> mapWithMaxColumns,
+                                                             RowGroupReadEntry rowGroup,
+                                                             DrillFileSystem fs,
+                                                             ParquetMetadata footer,
+                                                             boolean readSchemaOnly
+  ) {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
+      rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
+    boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
+    boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
+    logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", useNewReader ? "enabled" : "disabled",
+        containsComplexColumn ? "found." : "not found.");
+    CommonParquetRecordReader reader;
+
+    if (useNewReader || containsComplexColumn) {
+      reader = new DrillParquetReader(context,
+          footer,
+          rowGroup,
+          columnExplorer.getTableColumns(),
+          fs,
+          containsCorruptDates); // TODO: if readSchemaOnly - then set to zero rows to read (currently fails)
+    } else {
+      reader = new ParquetRecordReader(context,
+          rowGroup.getPath(),
+          rowGroup.getRowGroupIndex(),
+          readSchemaOnly ? 0 : rowGroup.getNumRecordsToRead(), // if readSchemaOnly - then set to zero rows to read
+          fs,
+          CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+          footer,
+          rowGroupScan.getColumns(),
+          containsCorruptDates);
+    }
+
+    logger.debug("Query {} uses {}",
+        QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
+        reader.getClass().getSimpleName());
+    readers.add(reader);
+
+    List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+    Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, rowGroupScan.supportsFileImplicitColumns());
+    implicitColumns.add(implicitValues);
+    if (implicitValues.size() > mapWithMaxColumns.size()) {
+      mapWithMaxColumns = implicitValues;
+    }
+    return mapWithMaxColumns;
+  }
+
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index e4fd382..c35e21b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -206,7 +206,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter,
+      tableMetadata == null ? null : (TupleSchema) tableMetadata.getSchema());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 9e39b54..bd2e119 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.calcite.rel.core.Filter;
 import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
 import org.apache.drill.exec.expr.FilterPredicate;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
@@ -170,6 +171,8 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
 
+    // Default - pass the original filter expr to (potentialy) be used at run-time
+    groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later may remove or set to another filter (see below)
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     AbstractGroupScanWithMetadata newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
@@ -187,6 +190,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
         // If current row group fully matches filter,
         // but row group pruning did not happen, remove the filter.
         if (nonConvertedPredList.isEmpty()) {
+          groupScan.setFilterForRuntime(null, optimizerContext); // disable the original filter expr (i.e. don't use it at run-time)
           call.transformTo(child);
         } else if (nonConvertedPredList.size() == predList.size()) {
           // None of the predicates participated in filter pushdown.
@@ -194,11 +198,18 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
         } else {
           // If some of the predicates weren't used in the filter, creates new filter with them
           // on top of current scan. Excludes the case when all predicates weren't used in the filter.
-          call.transformTo(filter.copy(filter.getTraitSet(), child,
-              RexUtil.composeConjunction(
-                  filter.getCluster().getRexBuilder(),
-                  nonConvertedPredList,
-                  true)));
+          Filter theNewFilter  = filter.copy(filter.getTraitSet(), child,
+            RexUtil.composeConjunction(
+              filter.getCluster().getRexBuilder(),
+              nonConvertedPredList,
+              true));
+
+          LogicalExpression filterPredicate = DrillOptiq.toDrill(
+            new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theNewFilter.getCondition());
+
+          groupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
+
+          call.transformTo(theNewFilter); // Replace the child with the new filter on top of the child/scan
         }
       }
       return;
@@ -213,11 +224,18 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     if (newGroupScan.isMatchAllMetadata()) {
       // creates filter from the expressions which can't be pushed to the scan
       if (!nonConvertedPredList.isEmpty()) {
-        newNode = filter.copy(filter.getTraitSet(), newNode,
-            RexUtil.composeConjunction(
-                filter.getCluster().getRexBuilder(),
-                nonConvertedPredList,
-                true));
+        Filter theFilterRel  = filter.copy(filter.getTraitSet(), newNode,
+          RexUtil.composeConjunction(
+            filter.getCluster().getRexBuilder(),
+            nonConvertedPredList,
+            true));
+
+        LogicalExpression filterPredicate = DrillOptiq.toDrill(
+          new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theFilterRel.getCondition());
+
+        newGroupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
+
+        newNode = theFilterRel; // replace the new node with the new filter on top of that new node
       }
       call.transformTo(newNode);
       return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index 6a7b967..8b1c43d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -20,11 +20,14 @@ package org.apache.drill.exec.store.parquet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader.Metric;
+import org.apache.drill.exec.store.CommonParquetRecordReader.Metric;
 import org.apache.hadoop.fs.Path;
 
 public class ParquetReaderStats {
 
+  public AtomicLong numRowgroups = new AtomicLong();
+  public AtomicLong rowgroupsPruned = new AtomicLong();
+
   public AtomicLong numDictPageLoads = new AtomicLong();
   public AtomicLong numDataPageLoads = new AtomicLong();
   public AtomicLong numDataPagesDecoded = new AtomicLong();
@@ -54,8 +57,10 @@ public class ParquetReaderStats {
 
   public void logStats(org.slf4j.Logger logger, Path hadoopPath) {
     logger.trace(
-        "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+        "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
         hadoopPath,
+        numRowgroups,
+        rowgroupsPruned,
         numDictPageLoads,
         numDataPageLoads,
         numDataPagesDecoded,
@@ -79,6 +84,10 @@ public class ParquetReaderStats {
   }
 
   public void update(OperatorStats stats){
+    stats.setLongStat(Metric.NUM_ROWGROUPS,
+        numRowgroups.longValue());
+    stats.setLongStat(Metric.ROWGROUPS_PRUNED,
+        rowgroupsPruned.longValue());
     stats.addLongStat(Metric.NUM_DICT_PAGE_LOADS,
         numDictPageLoads.longValue());
     stats.addLongStat(Metric.NUM_DATA_PAGE_lOADS, numDataPageLoads.longValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 2513772..fcbf307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -46,7 +47,6 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final Path selectionRoot;
 
   @JsonCreator
   public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -57,14 +57,16 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              @JsonProperty("columns") List<SchemaPath> columns,
                              @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                              @JsonProperty("selectionRoot") Path selectionRoot,
-                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
+                             @JsonProperty("filter") LogicalExpression filter,
+                             @JsonProperty("tupleSchema") TupleSchema tupleSchema) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
         columns,
         readerConfig,
         selectionRoot,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public ParquetRowGroupScan(String userName,
@@ -73,11 +75,11 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              List<SchemaPath> columns,
                              ParquetReaderConfig readerConfig,
                              Path selectionRoot,
-                             LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                             LogicalExpression filter,
+                             TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter, selectionRoot, tupleSchema);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
-    this.selectionRoot = selectionRoot;
   }
 
   @JsonProperty
@@ -90,11 +92,6 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
     return formatConfig;
   }
 
-  @JsonProperty
-  public Path getSelectionRoot() {
-    return selectionRoot;
-  }
-
   @JsonIgnore
   public ParquetFormatPlugin getStorageEngine() {
     return formatPlugin;
@@ -103,7 +100,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
@@ -113,7 +110,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index b1a686b..b8a912d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Utility class for converting parquet metadata classes to metastore metadata classes.
@@ -264,7 +265,7 @@ public class ParquetTableMetadataUtils {
    * @return map with converted row group metadata
    */
   @SuppressWarnings("unchecked")
-  private static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
+  public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
       MetadataBase.ParquetTableMetadataBase tableMetadata, MetadataBase.RowGroupMetadata rowGroupMetadata) {
 
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
@@ -301,8 +302,10 @@ public class ParquetTableMetadataUtils {
           Set<SchemaPath> schemaPaths, MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
     if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
-      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata :
-          ((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap().values()) {
+      ConcurrentHashMap<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4 > columnTypeInfoMap =
+        ((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap();
+      if ( columnTypeInfoMap == null ) { return columnsStatistics; } // in some cases for runtime pruning
+      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : columnTypeInfoMap.values()) {
         SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
         if (!schemaPaths.contains(schemaPath)) {
           Map<StatisticsKind, Object> statistics = new HashMap<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index ba4c493..4f1c8a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import java.util.List;
@@ -28,11 +29,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
@@ -42,7 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
-public class ParquetRecordReader extends AbstractRecordReader {
+public class ParquetRecordReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   /** Set when caller wants to read all the rows contained within the Parquet file */
@@ -60,21 +58,19 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // used for clearing the first n bits of a byte
   public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
 
-  private OperatorContext operatorContext;
+  /** Parquet Schema */
+  ParquetSchema schema;
 
   private FileSystem fileSystem;
   private final long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
-  private ParquetMetadata footer;
 
   private final CodecFactory codecFactory;
   int rowGroupIndex;
-  private final FragmentContext fragmentContext;
+
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
-  /** Parquet Schema */
-  ParquetSchema schema;
   /** Container object for holding Parquet columnar readers state */
   ReadState readState;
   /** Responsible for managing record batch size constraints */
@@ -92,36 +88,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
   @SuppressWarnings("unused")
   private Path name;
 
-  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
   private BatchReader batchReader;
 
-  public enum Metric implements MetricDef {
-    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
-    NUM_DATA_PAGE_lOADS,         // Number of data pages read
-    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
-    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
-    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
-    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary pages
-    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
-    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
-    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
-    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary pages from disk
-    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from disk
-    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
-    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary pages
-    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
-    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
-    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
-    TIME_DISK_SCAN,                // Time in nanos spent in reading data from disk.
-    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed width data to value vectors
-    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting varwidth data to value vectors
-    TIME_PROCESS;                  // Time in nanos spent in processing
-
-    @Override public int metricId() {
-      return ordinal();
-    }
-  }
-
   public ParquetRecordReader(FragmentContext fragmentContext,
       Path path,
       int rowGroupIndex,
@@ -156,15 +124,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+    super(footer, fragmentContext);
 
     this.name = path;
     this.hadoopPath = path;
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
-    this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
-    this.fragmentContext = fragmentContext;
     this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
     this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
     this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
@@ -321,15 +288,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
     codecFactory.release();
 
-    if (parquetReaderStats != null) {
-      updateStats();
-      parquetReaderStats.logStats(logger, hadoopPath);
-      parquetReaderStats = null;
-    }
-  }
-
-  private void updateStats() {
-    parquetReaderStats.update(operatorContext.getStats());
+    closeStats(logger, hadoopPath);
   }
 
   @Override
@@ -338,13 +297,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, ParquetMetadata footer) {
+    if ( numRecordsToRead == 0 ) { return 0; } // runtime pruning sometimes prunes everything, needs one empty RG for the schema
+    int numRowsInRowgroup = (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
     // Callers can pass -1 if they want to read all rows.
     if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
-      return (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
-    } else {
-      assert (numRecordsToRead >= 0);
-      return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+      return numRowsInRowgroup;
     }
+    assert (numRecordsToRead > 0);
+    return (int) Math.min(numRecordsToRead, numRowsInRowgroup);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
index c545862..deccde5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -80,6 +80,13 @@ public class ReadState {
       nullFilledVectors = new ArrayList<>();
     }
 
+    // In the case where runtime pruning prunes out all the rowgroups, then just a single rowgroup
+    // with zero rows is read (in order to get the schema, no need for the rows)
+    if ( numRecordsToRead == 0 ) {
+      this.totalNumRecordsToRead = 0;
+      return;
+    }
+
     // Because of JIRA DRILL-6528, the Parquet reader is sometimes getting the wrong
     // number of rows to read. For now, returning all a file data (till
     // downstream operator stop consuming).
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index a39356b..8b0e34a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -371,7 +371,8 @@ public final class RecordBatchSizerManager {
 
   private void assignColumnsBatchMemory() {
 
-    if (getNumColumns() == 0) {
+    if (getNumColumns() == 0 ||
+        maxRecordsPerBatch == 0) { // Happens when all row-groups are pruned, and only one is returned empty (TODO: currently not empty)
       return;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 2b0581c..59849e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -470,7 +470,7 @@ public class Metadata {
 
     @Override
     protected ParquetFileAndRowCountMetadata runInner() throws Exception {
-      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, allColumnsInteresting, columnSet);
+      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, allColumnsInteresting, columnSet, readerConfig);
     }
 
     public String toString() {
@@ -478,7 +478,7 @@ public class Metadata {
     }
   }
 
-  private ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] path, int depth) {
+  private static ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] path, int depth) {
     if (type.isPrimitive()) {
       PrimitiveType primitiveType = (PrimitiveType) type;
       int precision = 0;
@@ -497,7 +497,7 @@ public class Metadata {
     return getColTypeInfo(schema, t, path, depth + 1);
   }
 
-  private class ColTypeInfo {
+  private static class ColTypeInfo {
     public OriginalType originalType;
     public int precision;
     public int scale;
@@ -513,28 +513,52 @@ public class Metadata {
     }
   }
 
+  // A private version of the following static method, with no footer given
+  private ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                           final FileStatus file, final FileSystem fs,
+                                                           boolean allColumnsInteresting, Set<String> columnSet,
+                                                           ParquetReaderConfig readerConfig)
+    throws IOException, InterruptedException {
+    return getParquetFileMetadata_v4(parquetTableMetadata, null /* no footer */, file, fs, allColumnsInteresting, false, columnSet, readerConfig);
+  }
   /**
-   * Get the metadata for a single file
+   * Get the file metadata for a single file
+   *
+   * @param parquetTableMetadata The table metadata to be updated with all the columns' info
+   * @param footer If non null, use this footer instead of reading it from the file
+   * @param file The file
+   * @param allColumnsInteresting If true, read the min/max metadata for all the columns
+   * @param skipNonInteresting If true, collect info only for the interesting columns
+   * @param columnSet Specifies specific columns for which min/max metadata is collected
+   * @param readerConfig for the options
+   * @return the file metadata
    */
-  private ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
-                                                                   final FileStatus file, final FileSystem fs, boolean allColumnsInteresting, Set<String> columnSet) throws IOException, InterruptedException {
-    final ParquetMetadata metadata;
-    final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
-    final Configuration conf = new Configuration(fs.getConf());
+  public static ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                                         ParquetMetadata footer,
+                                                                         final FileStatus file,
+                                                                         final FileSystem fs,
+                                                                         boolean allColumnsInteresting,
+                                                                         boolean skipNonInteresting,
+                                                                         Set<String> columnSet,
+                                                                         ParquetReaderConfig readerConfig)
+    throws IOException, InterruptedException {
     Map<ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = new HashMap<>();
     long totalRowCount = 0;
-    try {
-      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
-        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
-          return parquetFileReader.getFooter();
-        }
-      });
-    } catch(Exception e) {
-      logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
-        file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
-      throw e;
+    ParquetMetadata metadata = footer; // if a non-null footer is given, no need to read it again from the file
+    if (metadata == null) {
+      final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+      final Configuration conf = new Configuration(fs.getConf());
+      try {
+        metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) () -> {
+          try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
+            return parquetFileReader.getFooter();
+          }
+        });
+      } catch (Exception e) {
+        logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}", file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
+        throw e;
+      }
     }
-
     MessageType schema = metadata.getFileMetaData().getSchema();
 
     Map<SchemaPath, ColTypeInfo> colTypeInfoMap = new HashMap<>();
@@ -560,6 +584,8 @@ public class Metadata {
       for (ColumnChunkMetaData col : rowGroup.getColumns()) {
         String[] columnName = col.getPath().toArray();
         SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+        boolean thisColumnIsInteresting = allColumnsInteresting || columnSet == null || columnSet.contains(columnSchemaName.getRootSegmentPath());
+        if ( skipNonInteresting && ! thisColumnIsInteresting ) { continue; }
         ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
         Statistics<?> stats = col.getStatistics();
         long totalNullCount = stats.getNumNulls();
@@ -578,7 +604,7 @@ public class Metadata {
           long nullCount = totalNullCountMap.get(columnTypeMetadataKey) + totalNullCount;
           totalNullCountMap.put(columnTypeMetadataKey, nullCount);
         }
-        if (allColumnsInteresting || columnSet == null || !allColumnsInteresting && columnSet != null && columnSet.size() > 0 && columnSet.contains(columnSchemaName.getRootSegmentPath())) {
+        if ( thisColumnIsInteresting ) {
           // Save the column schema info. We'll merge it into one list
           Object minValue = null;
           Object maxValue = null;
@@ -629,7 +655,7 @@ public class Metadata {
    * @param length     the length of the row group
    * @return host affinity for the row group
    */
-  private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length)
+  private static Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length)
       throws IOException {
     BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
     Map<String, Float> hostAffinityMap = Maps.newHashMap();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
index 7023a1d..e345ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
@@ -44,6 +44,8 @@ public class Metadata_V4 {
     MetadataSummary metadataSummary = new MetadataSummary();
     FileMetadata fileMetadata = new FileMetadata();
 
+    public ParquetTableMetadata_v4() {}
+
     public ParquetTableMetadata_v4(MetadataSummary metadataSummary) {
       this.metadataSummary = metadataSummary;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 210b8fe..f338d2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -39,11 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -66,23 +66,20 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
-public class DrillParquetReader extends AbstractRecordReader {
+public class DrillParquetReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
   // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
 
   private static final char DEFAULT_RECORDS_TO_READ = 32*1024;
 
-  private ParquetMetadata footer;
   private MessageType schema;
-  private DrillFileSystem fileSystem;
+  private DrillFileSystem drillFileSystem;
   private RowGroupReadEntry entry;
   private ColumnChunkIncReadStore pageReadStore;
   private RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
-  private OperatorContext operatorContext;
-  private FragmentContext fragmentContext;
   /** Configured Parquet records per batch */
   private final int recordsPerBatch;
 
@@ -100,14 +97,12 @@ public class DrillParquetReader extends AbstractRecordReader {
   // See DRILL-4203
   private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates;
 
-  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry,
-      List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+    super(footer, fragmentContext);
     this.containsCorruptedDates = containsCorruptedDates;
-    this.footer = footer;
-    this.fileSystem = fileSystem;
+    this.drillFileSystem = fileSystem;
     this.entry = entry;
     setColumns(columns);
-    this.fragmentContext = fragmentContext;
     this.recordsPerBatch = (int) fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
   }
 
@@ -242,9 +237,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-          CodecFactory.createDirectCodecFactory(fileSystem.getConf(),
-              new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(),
-          fileSystem, filePath);
+          CodecFactory.createDirectCodecFactory(drillFileSystem.getConf(),
+              new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(), drillFileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);
@@ -322,8 +316,9 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   @Override
   public void close() {
+    closeStats(logger, entry.getPath());
     footer = null;
-    fileSystem = null;
+    drillFileSystem = null;
     entry = null;
     recordReader = null;
     recordMaterializer = null;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b2ff4a5..5096680 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -525,6 +525,7 @@ drill.exec.options: {
     exec.storage.enable_new_text_reader: true,
     exec.storage.enable_v3_text_reader: false,
     exec.storage.min_width: 1,
+    exec.storage.skip_runtime_rowgroup_pruning: false,
     exec.udf.enable_dynamic_support: true,
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index a62dc33..112cfec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -20,17 +20,26 @@ package org.apache.drill.exec.store.parquet;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.expr.stat.RowsMatch;
 import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
 import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.metastore.ColumnStatisticsKind;
 import org.apache.drill.exec.expr.IsPredicate;
 import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 import org.junit.AfterClass;
@@ -46,8 +55,10 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestParquetFilterPushDown extends PlanTestBase {
   private static final String CTAS_TABLE = "order_ctas";
@@ -356,7 +367,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
 
   @Test
   // Test against parquet files from Drill CTAS post 1.8.0 release.
-  public void testDatePredicateAgaistDrillCTASPost1_8() throws Exception {
+  public void testDatePredicateAgainstDrillCTASPost1_8() throws Exception {
     test("use dfs.tmp");
     test("create table `%s/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and " +
       "date '1992-01-03'", CTAS_TABLE);
@@ -699,4 +710,57 @@ public class TestParquetFilterPushDown extends PlanTestBase {
   private MetadataBase.ParquetTableMetadataBase getParquetMetaData(File file) throws IOException {
     return Metadata.getParquetTableMetadata(fs, file.toURI().getPath(), ParquetReaderConfig.getDefaultInstance());
   }
+
+  // =========  runtime pruning  ==========
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
+  /**
+   *
+   * @throws Exception
+   */
+  private void genericTestRuntimePruning(int maxParallel, String sql, long expectedRows, int numPartitions, int numPruned) throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(baseDirTestWatcher)
+      .sessionOption(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,false)
+      .sessionOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,0)
+      .maxParallelization(maxParallel)
+      .saveProfiles();
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      runAndCheckResults(client, sql, expectedRows, numPartitions, numPruned);
+    }
+  }
+  /**
+   * Test runtime pruning
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRuntimePruning() throws Exception {
+    // 's' is the partitioning key (values are: 3,4,5,6 ) -- prune out 2 out of 4 rowgroups
+    genericTestRuntimePruning( 2, "select a from cp.`parquet/multirowgroupwithNulls.parquet` where s > 4", 20, 4,2 );
+    // prune out all rowgroups
+    genericTestRuntimePruning( 2, "select a from cp.`parquet/multirowgroupwithNulls.parquet` where s > 8", 0, 4,4 );
+  }
+
+  private void runAndCheckResults(ClientFixture client, String sql, long expectedRows, long numPartitions, long numPruned) throws Exception {
+    QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+
+    if (expectedRows > 0) {
+      assertEquals(expectedRows, summary.recordCount());
+    }
+
+    ProfileParser profile = client.parseProfile(summary.queryIdString());
+    List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE);
+
+    assertTrue(!ops.isEmpty());
+    // check for the first op only
+    ProfileParser.OperatorProfile parquestScan0 = ops.get(0);
+    long resultNumRowgroups = parquestScan0.getMetric(ParquetRecordReader.Metric.NUM_ROWGROUPS.ordinal());
+    assertEquals(numPartitions, resultNumRowgroups);
+    long resultNumPruned = parquestScan0.getMetric(ParquetRecordReader.Metric.ROWGROUPS_PRUNED.ordinal());
+    assertEquals(numPruned,resultNumPruned);
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
index 6ac08ee..f45912f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
@@ -79,7 +79,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
   public void testPushProjectIntoScanWithExpressionInFilter() throws Exception {
     String query = String.format("select o_orderdate from (select * from `%s`.`%s`) where o_custkey + o_orderkey < 5", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=3, numRowGroups=3, usedMetadataFile=false, columns=\\[`o_orderdate`, `o_custkey`, `o_orderkey`\\]"};
+    String[] expectedPlan = {"numFiles=3, numRowGroups=3, usedMetadataFile=false,.* columns=\\[`o_orderdate`, " +
+      "`o_custkey`, `o_orderkey`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -115,7 +116,7 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String query = "select t.trans_id from (select * from cp.`store/parquet/complex/complex.parquet`) t " +
       "where t.user_info.cust_id > 28 and t.user_info.device = 'IOS5' and t.marketing_info.camp_id > 5 and t.marketing_info.keywords[2] is not null";
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, " +
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* " +
       "columns=\\[`trans_id`, `user_info`.`cust_id`, `user_info`.`device`, `marketing_info`.`camp_id`, `marketing_info`.`keywords`\\[2\\]\\]"};
     String[] excludedPlan = {};
 
@@ -218,7 +219,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
   public void testFilterPushDownSingleCondition() throws Exception {
     String query = String.format("select * from (select * from `%s`.`%s`) where o_orderdate = date '1992-01-01'", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -235,7 +237,7 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String query = String.format("select * from (select * from `%s`.`%s`) where o_orderdate = date '1992-01-01' or o_orderdate = date '1992-01-09'",
         DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=2, numRowGroups=2, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=2, numRowGroups=2, usedMetadataFile=false,.* columns=\\[`\\*\\*`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -253,7 +255,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, TABLE_NAME);
     String query = String.format("select * from (select * from (select * from (%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -270,7 +273,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, TABLE_NAME);
     String query = String.format("select * from (select * from (select *, o_custkey from (%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_custkey`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_custkey`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);