You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2019/05/03 22:46:26 UTC

[drill] branch master updated (422bf20 -> 108ced0)

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 422bf20  [maven-release-plugin] prepare for next development iteration
     new 9dc81e1  DRILL-7062: Initial implementation of run-time rowgroup pruning closes #1738
     new 878cfc0  DRILL-7167: Implemented DESCRIBE TABLE statement
     new de74eab  DRILL-7171: Create metadata directories cache file in the leaf level directories to support ConvertCountToDirectScan optimization. closes #1748
     new d897f70  DRILL-6988. Utility of the too long error message when syntax error
     new caa9831  DRILL-6974: SET option command modification
     new 5862a39  DRILL-7225: Fixed merging ColumnTypeInfo for files with different schemas closes #1773
     new e191c54  DRILL-7228: Upgrade to a newer version of t-digest to address inaccuracies in histogram buckets. closes #1774
     new 108ced0  DRILL-7098: File Metadata Metastore Plugin closes #1754

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  18 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   3 +-
 exec/java-exec/pom.xml                             |  15 ++
 exec/java-exec/src/main/codegen/data/Parser.tdd    |   6 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  66 +++++-
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../org/apache/drill/exec/expr/IsPredicate.java    |   4 +-
 .../drill/exec/expr/fn/impl/TDigestFunctions.java  | 140 +++++------
 .../base/AbstractGroupScanWithMetadata.java        |  56 +++--
 .../apache/drill/exec/physical/base/GroupScan.java |   2 -
 .../base/SimpleFileTableMetadataProvider.java      |   2 +-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   4 +-
 .../impl/statistics/AbstractMergedStatistic.java   |   2 +-
 .../impl/statistics/TDigestMergedStatistic.java    |  12 +-
 .../planner/common/NumericEquiDepthHistogram.java  |   4 +-
 .../logical/ConvertCountToDirectScanRule.java      |   4 +-
 .../physical/ConvertCountToDirectScanPrule.java    |   3 +-
 .../exec/planner/physical/PlannerSettings.java     |   6 +-
 .../drill/exec/planner/sql/DrillSqlWorker.java     |  22 +-
 .../drill/exec/planner/sql/SqlConverter.java       | 104 ++++----
 .../sql/handlers/AbstractSqlSetHandler.java        |  86 +++++++
 .../planner/sql/handlers/ResetOptionHandler.java   |  73 ++++++
 .../planner/sql/handlers/SetOptionHandler.java     | 119 ++++------
 .../sql/parser/CompoundIdentifierConverter.java    |   4 +-
 .../exec/planner/sql/parser/DrillParserUtil.java   |   5 +
 .../planner/sql/parser/DrillSqlResetOption.java    | 103 ++++++++
 ...SqlDropFunction.java => DrillSqlSetOption.java} |  60 +++--
 .../sql/parser/impl/DrillSqlParseException.java    | 105 +++++++++
 .../exec/record/metadata/MapColumnMetadata.java    |   3 +-
 .../drill/exec/record/metadata/MetadataUtils.java  |  37 +++
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../exec/store/CommonParquetRecordReader.java      |  85 +++++++
 .../store/parquet/AbstractParquetGroupScan.java    |  14 +-
 .../store/parquet/AbstractParquetRowGroupScan.java |  18 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 262 +++++++++++++++++----
 .../store/parquet/BaseParquetMetadataProvider.java |  11 +-
 .../drill/exec/store/parquet/ParquetGroupScan.java |   3 +-
 .../store/parquet/ParquetGroupScanStatistics.java  |   8 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  38 ++-
 .../exec/store/parquet/ParquetReaderStats.java     |  13 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |  23 +-
 .../store/parquet/ParquetTableMetadataUtils.java   |  19 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  64 +----
 .../store/parquet/columnreaders/ReadState.java     |   7 +
 .../batchsizing/RecordBatchSizerManager.java       |   3 +-
 .../exec/store/parquet/metadata/Metadata.java      |  92 +++++---
 .../store/parquet/metadata/MetadataPathUtils.java  |  13 +-
 .../exec/store/parquet/metadata/Metadata_V4.java   |   2 +
 .../exec/store/parquet2/DrillParquetReader.java    |  25 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../logical/TestConvertCountToDirectScan.java      |  98 ++++++--
 .../drill/exec/planner/sql/TestDrillSQLWorker.java |  33 +--
 .../sql/handlers/ResetOptionHandlerTest.java       |  64 +++++
 .../planner/sql/handlers/SetOptionHandlerTest.java |  90 +++++++
 .../org/apache/drill/exec/sql/TestInfoSchema.java  |  62 ++++-
 .../store/parquet/TestParquetFilterPushDown.java   |  66 +++++-
 .../store/parquet/TestParquetMetadataCache.java    |  17 ++
 .../TestPushDownAndPruningWithItemStar.java        |  16 +-
 exec/jdbc-all/pom.xml                              |   4 +-
 .../org/apache/drill/jdbc/impl/DrillMetaImpl.java  |  17 +-
 {contrib/data => metastore/file-metadata}/pom.xml  |  19 +-
 .../exec/physical/base/TableMetadataProvider.java  |   0
 .../base/TableMetadataProviderBuilder.java         |   0
 .../org/apache/drill/metastore/FileMetadata.java   |   0
 .../apache/drill/metastore/FileTableMetadata.java  |   0
 {tools => metastore/metastore-api}/pom.xml         |  16 +-
 .../drill/exec/expr/ExactStatisticsConstants.java  |   0
 .../exec/physical/impl/statistics/Statistic.java   |  36 +--
 .../exec/record/metadata/SchemaPathUtils.java      |  34 ---
 .../org/apache/drill/metastore/BaseMetadata.java   |   0
 .../metastore/CollectableColumnStatisticsKind.java |   0
 .../metastore/CollectableTableStatisticsKind.java  |   0
 .../apache/drill/metastore/ColumnStatistics.java   |   0
 .../drill/metastore/ColumnStatisticsImpl.java      |   0
 .../drill/metastore/ColumnStatisticsKind.java      |   5 +-
 .../apache/drill/metastore/LocationProvider.java   |   0
 .../apache/drill/metastore/PartitionMetadata.java  |   0
 .../apache/drill/metastore/RowGroupMetadata.java   |   0
 .../org/apache/drill/metastore/StatisticsKind.java |   0
 .../org/apache/drill/metastore/TableMetadata.java  |   0
 .../drill/metastore/TableStatisticsKind.java       |   7 +-
 {exec/rpc => metastore}/pom.xml                    |  62 ++---
 pom.xml                                            |   1 +
 83 files changed, 1772 insertions(+), 649 deletions(-)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlSetHandler.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandler.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlResetOption.java
 copy exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/{SqlDropFunction.java => DrillSqlSetOption.java} (55%)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillSqlParseException.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandlerTest.java
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
 copy {contrib/data => metastore/file-metadata}/pom.xml (75%)
 rename {exec/java-exec => metastore/file-metadata}/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProvider.java (100%)
 rename {exec/java-exec => metastore/file-metadata}/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProviderBuilder.java (100%)
 rename {exec/java-exec => metastore/file-metadata}/src/main/java/org/apache/drill/metastore/FileMetadata.java (100%)
 rename {exec/java-exec => metastore/file-metadata}/src/main/java/org/apache/drill/metastore/FileTableMetadata.java (100%)
 copy {tools => metastore/metastore-api}/pom.xml (78%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/exec/expr/ExactStatisticsConstants.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java (54%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java (55%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/BaseMetadata.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/CollectableColumnStatisticsKind.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/CollectableTableStatisticsKind.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/ColumnStatistics.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/ColumnStatisticsImpl.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java (96%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/LocationProvider.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/PartitionMetadata.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/RowGroupMetadata.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/StatisticsKind.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/TableMetadata.java (100%)
 rename {exec/java-exec => metastore/metastore-api}/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java (94%)
 copy {exec/rpc => metastore}/pom.xml (63%)


[drill] 04/08: DRILL-6988. Utility of the too long error message when syntax error

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d897f709e60557be38deff2276118c8a0b0196c5
Author: Dmytriy Grinchenko <dm...@gmail.com>
AuthorDate: Wed Apr 17 12:57:39 2019 +0300

    DRILL-6988. Utility of the too long error message when syntax error
    
    - Adding Drill wrapper around SqlparseException to customize produced by Calcite messages
    - Fix Drill SQL parse exception formatter to calculate proper position for "^" character
    closes #1753
---
 .../drill/exec/planner/sql/SqlConverter.java       | 104 +++++++++++---------
 .../exec/planner/sql/parser/DrillParserUtil.java   |   5 +
 .../sql/parser/impl/DrillSqlParseException.java    | 105 +++++++++++++++++++++
 .../drill/exec/planner/sql/TestDrillSQLWorker.java |  33 ++++---
 4 files changed, 189 insertions(+), 58 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 237b57b..17b0490 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -26,6 +26,10 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.calcite.util.Static;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
+import org.apache.drill.exec.planner.sql.parser.impl.DrillSqlParseException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.MetadataProviderManager;
 import org.apache.drill.exec.physical.base.TableMetadataProvider;
@@ -59,6 +63,7 @@ import org.apache.calcite.runtime.Hook;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -90,7 +95,6 @@ import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.store.dfs.FileSelection;
-import static org.apache.calcite.util.Static.RESOURCE;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 import org.apache.drill.exec.store.ColumnExplorer;
@@ -106,6 +110,7 @@ public class SqlConverter {
 
   private final JavaTypeFactory typeFactory;
   private final SqlParser.Config parserConfig;
+
   // Allow the default config to be modified using immutable configs
   private SqlToRelConverter.Config sqlToRelConverterConfig;
   private final DrillCalciteCatalogReader catalog;
@@ -143,7 +148,7 @@ public class SqlConverter {
     this.sqlToRelConverterConfig = new SqlToRelConverterConfig();
     this.isInnerQuery = false;
     this.typeFactory = new JavaTypeFactoryImpl(DRILL_TYPE_SYSTEM);
-    this.defaultSchema =  context.getNewDefaultSchema();
+    this.defaultSchema = context.getNewDefaultSchema();
     this.rootSchema = rootSchema(defaultSchema);
     this.temporarySchema = context.getConfig().getString(ExecConstants.DEFAULT_TEMPORARY_WORKSPACE);
     this.session = context.getSession();
@@ -191,15 +196,15 @@ public class SqlConverter {
       SqlParser parser = SqlParser.create(sql, parserConfig);
       return parser.parseStmt();
     } catch (SqlParseException e) {
+      DrillSqlParseException dex = new DrillSqlParseException(e);
       UserException.Builder builder = UserException
-          .parseError(e)
-          .addContext("SQL Query", formatSQLParsingError(sql, e.getPos()));
+          .parseError(dex)
+          .addContext(formatSQLParsingError(sql, dex));
       if (isInnerQuery) {
         builder.message("Failure parsing a view your query is dependent upon.");
       }
       throw builder.build(logger);
     }
-
   }
 
   public SqlNode validate(final SqlNode parsedNode) {
@@ -265,26 +270,25 @@ public class SqlConverter {
         SqlNode node,
         RelDataType targetRowType,
         SqlValidatorScope scope) {
-      switch (node.getKind()) {
-        case AS:
-          SqlNode sqlNode = ((SqlCall) node).operand(0);
-          switch (sqlNode.getKind()) {
-            case IDENTIFIER:
-              SqlIdentifier tempNode = (SqlIdentifier) sqlNode;
-              DrillCalciteCatalogReader catalogReader = (SqlConverter.DrillCalciteCatalogReader) getCatalogReader();
-
-              changeNamesIfTableIsTemporary(tempNode);
-
-              // Check the schema and throw a valid SchemaNotFound exception instead of TableNotFound exception.
-              if (catalogReader.getTable(tempNode.names) == null) {
-                catalogReader.isValidSchema(tempNode.names);
-              }
-              break;
-            case UNNEST:
-              if (((SqlCall) node).operandCount() < 3) {
-                throw RESOURCE.validationError("Alias table and column name are required for UNNEST").ex();
-              }
-          }
+      if (node.getKind() == SqlKind.AS) {
+        SqlNode sqlNode = ((SqlCall) node).operand(0);
+        switch (sqlNode.getKind()) {
+          case IDENTIFIER:
+            SqlIdentifier tempNode = (SqlIdentifier) sqlNode;
+            DrillCalciteCatalogReader catalogReader = (DrillCalciteCatalogReader) getCatalogReader();
+
+            changeNamesIfTableIsTemporary(tempNode);
+
+            // Check the schema and throw a valid SchemaNotFound exception instead of TableNotFound exception.
+            if (catalogReader.getTable(tempNode.names) == null) {
+              catalogReader.isValidSchema(tempNode.names);
+            }
+            break;
+          case UNNEST:
+            if (((SqlCall) node).operandCount() < 3) {
+              throw Static.RESOURCE.validationError("Alias table and column name are required for UNNEST").ex();
+            }
+        }
       }
       super.validateFrom(node, targetRowType, scope);
     }
@@ -493,30 +497,44 @@ public class SqlConverter {
   }
 
   /**
+   * Formats sql exception with context name included and with
+   * graphical representation for the {@link DrillSqlParseException}
    *
-   * @param sql
-   *          the SQL sent to the server
-   * @param pos
-   *          the position of the error
+   * @param sql     the SQL sent to the server
+   * @param ex      exception object
    * @return The sql with a ^ character under the error
    */
-  static String formatSQLParsingError(String sql, SqlParserPos pos) {
-    if (pos == null) {
-      return sql;
-    }
-    StringBuilder sb = new StringBuilder();
-    String[] lines = sql.split("\n");
-    for (int i = 0; i < lines.length; i++) {
-      String line = lines[i];
-      sb.append(line).append("\n");
-      if (i == (pos.getLineNum() - 1)) {
-        for (int j = 0; j < pos.getColumnNum() - 1; j++) {
-          sb.append(" ");
+  static String formatSQLParsingError(String sql, DrillSqlParseException ex) {
+    final String sqlErrorMessageHeader = "SQL Query: ";
+    final SqlParserPos pos = ex.getPos();
+
+    if (pos != null) {
+      int issueLineNumber = pos.getLineNum() - 1;  // recalculates to base 0
+      int issueColumnNumber = pos.getColumnNum() - 1;  // recalculates to base 0
+      int messageHeaderLength = sqlErrorMessageHeader.length();
+
+      // If the issue happens on the first line, header width should be calculated alongside with the sql query
+      int shiftLength = (issueLineNumber == 0) ? issueColumnNumber + messageHeaderLength : issueColumnNumber;
+
+      StringBuilder sb = new StringBuilder();
+      String[] lines = sql.split(DrillParserUtil.EOL);
+
+      for (int i = 0; i < lines.length; i++) {
+        sb.append(lines[i]);
+
+        if (i == issueLineNumber) {
+          sb
+              .append(DrillParserUtil.EOL)
+              .append(StringUtils.repeat(' ', shiftLength))
+              .append("^");
+        }
+        if (i < lines.length - 1) {
+          sb.append(DrillParserUtil.EOL);
         }
-        sb.append("^\n");
       }
+      sql = sb.toString();
     }
-    return sb.toString();
+    return sqlErrorMessageHeader + sql;
   }
 
   private static SchemaPlus rootSchema(SchemaPlus schema) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
index ee1106d..06e5881 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillParserUtil.java
@@ -32,6 +32,11 @@ public class DrillParserUtil {
 
   private static final int CONDITION_LIST_CAPACITY = 3;
 
+  /**
+   * System-depended end of line character
+   */
+  public static final String EOL = System.lineSeparator();
+
   public static SqlNode createCondition(SqlNode left, SqlOperator op, SqlNode right) {
 
     // if one of the operands is null, return the other
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillSqlParseException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillSqlParseException.java
new file mode 100644
index 0000000..f248241
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillSqlParseException.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser.impl;
+
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Customized {@link SqlParseException} class
+ */
+public class DrillSqlParseException extends SqlParseException {
+  private final ParseException parseException;
+
+  public DrillSqlParseException(String message, SqlParserPos pos, int[][] expectedTokenSequences,
+                                String[] tokenImages, Throwable ex) {
+    super(message, pos, expectedTokenSequences, tokenImages, ex);
+
+    parseException = (ex instanceof ParseException) ? (ParseException) ex : null;
+  }
+
+  public DrillSqlParseException(SqlParseException sqlParseException) {
+    this(sqlParseException.getMessage(), sqlParseException.getPos(), sqlParseException.getExpectedTokenSequences(),
+        sqlParseException.getTokenImages(), sqlParseException.getCause());
+  }
+
+  /**
+   * Builds error message just like the original {@link SqlParseException}
+   * with special handling for {@link ParseException} class.
+   * <p>
+   * This is customized implementation of the original {@link ParseException#getMessage()}.
+   * Any other underlying {@link SqlParseException} exception messages goes through without changes
+   * <p>
+   * <p>
+   * Example:
+   * <pre>
+   *
+   *   Given query: SELECT FROM (VALUES(1));
+   *
+   *   Generated message for the unsupported FROM token after SELECT would look like:
+   *
+   *       Encountered "FROM" at line 1, column 8.
+   *</pre>
+   * @return formatted string representation of {@link DrillSqlParseException}
+   */
+  @Override
+  public String getMessage() {
+    // proxy the original message if exception does not belongs
+    // to ParseException or no current token passed
+    if (parseException == null || parseException.currentToken == null) {
+      return super.getMessage();
+    }
+
+    int[][] expectedTokenSequences = getExpectedTokenSequences();
+    String[] tokenImage = getTokenImages();
+
+    int maxSize = 0;  // holds max possible length of the token sequence
+    for (int[] expectedTokenSequence : expectedTokenSequences) {
+      if (maxSize < expectedTokenSequence.length) {
+        maxSize = expectedTokenSequence.length;
+      }
+    }
+
+    // parseException.currentToken points to the last successfully parsed token, next one is considered as fail reason
+    Token tok = parseException.currentToken.next;
+    StringBuilder sb = new StringBuilder("Encountered \"");
+
+    // Adds unknown token sequences to the message
+    for (int i = 0; i < maxSize; i++) {
+      if (i != 0) {
+        sb.append(" ");
+      }
+
+      if (tok.kind == DrillParserImplConstants.EOF) {
+        sb.append(tokenImage[0]);
+        break;
+      }
+      sb.append(parseException.add_escapes(tok.image));
+      tok = tok.next;
+    }
+
+    sb
+        .append("\" at line ")
+        .append(parseException.currentToken.beginLine)
+        .append(", column ")
+        .append(parseException.currentToken.next.beginColumn)
+        .append(".");
+
+    return sb.toString();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
index 57c5de5..941cdfc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.calcite.avatica.util.Quoting;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.parser.impl.DrillSqlParseException;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -31,28 +32,30 @@ import org.junit.experimental.categories.Category;
 public class TestDrillSQLWorker extends BaseTestQuery {
 
   private void validateFormattedIs(String sql, SqlParserPos pos, String expected) {
-    String formatted = SqlConverter.formatSQLParsingError(sql, pos);
+    DrillSqlParseException ex = new DrillSqlParseException(null, pos, null, null, null);
+    String formatted = SqlConverter.formatSQLParsingError(sql, ex);
     assertEquals(expected, formatted);
   }
 
   @Test
-  public void testErrorFormating() {
-    String sql = "Select * from Foo\nwhere tadadidada;\n";
+  public void testErrorFormatting() {
+    String sql = "Select * from Foo\n"
+        + "where tadadidada;\n";
     validateFormattedIs(sql, new SqlParserPos(1, 2),
-        "Select * from Foo\n"
-      + " ^\n"
-      + "where tadadidada;\n");
+        "SQL Query: Select * from Foo\n"
+            + "            ^\n"
+            + "where tadadidada;");
     validateFormattedIs(sql, new SqlParserPos(2, 2),
-        "Select * from Foo\n"
-      + "where tadadidada;\n"
-      + " ^\n" );
+        "SQL Query: Select * from Foo\n"
+            + "where tadadidada;\n"
+            + " ^" );
     validateFormattedIs(sql, new SqlParserPos(1, 10),
-        "Select * from Foo\n"
-      + "         ^\n"
-      + "where tadadidada;\n");
-    validateFormattedIs(sql, new SqlParserPos(-11, -10), sql);
-    validateFormattedIs(sql, new SqlParserPos(0, 10), sql);
-    validateFormattedIs(sql, new SqlParserPos(100, 10), sql);
+        "SQL Query: Select * from Foo\n"
+            + "                    ^\n"
+            + "where tadadidada;");
+    validateFormattedIs(sql, new SqlParserPos(-11, -10), "SQL Query: Select * from Foo\nwhere tadadidada;");
+    validateFormattedIs(sql, new SqlParserPos(0, 10), "SQL Query: Select * from Foo\nwhere tadadidada;");
+    validateFormattedIs(sql, new SqlParserPos(100, 10), "SQL Query: Select * from Foo\nwhere tadadidada;");
   }
 
   @Test


[drill] 08/08: DRILL-7098: File Metadata Metastore Plugin closes #1754

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 108ced0ea10dc379f2b60b32698b6aa26683b585
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Wed Apr 17 15:46:54 2019 +0300

    DRILL-7098: File Metadata Metastore Plugin
    closes #1754
---
 exec/java-exec/pom.xml                             | 10 +++
 .../org/apache/drill/exec/expr/IsPredicate.java    |  4 +-
 .../base/AbstractGroupScanWithMetadata.java        | 13 ++--
 .../apache/drill/exec/physical/base/GroupScan.java |  2 -
 .../base/SimpleFileTableMetadataProvider.java      |  2 +-
 .../impl/statistics/AbstractMergedStatistic.java   |  2 +-
 .../logical/ConvertCountToDirectScanRule.java      |  4 +-
 .../physical/ConvertCountToDirectScanPrule.java    |  3 +-
 .../exec/record/metadata/MapColumnMetadata.java    |  3 +-
 .../drill/exec/record/metadata/MetadataUtils.java  | 37 +++++++++++
 .../store/parquet/AbstractParquetGroupScan.java    |  3 +-
 .../store/parquet/BaseParquetMetadataProvider.java | 11 ++--
 .../store/parquet/ParquetGroupScanStatistics.java  |  8 +--
 .../store/parquet/ParquetTableMetadataUtils.java   | 10 +--
 .../store/parquet/metadata/MetadataPathUtils.java  | 13 ++--
 exec/jdbc-all/pom.xml                              |  4 +-
 .../org/apache/drill/jdbc/impl/DrillMetaImpl.java  | 17 ++---
 metastore/file-metadata/pom.xml                    | 41 ++++++++++++
 .../exec/physical/base/TableMetadataProvider.java  |  0
 .../base/TableMetadataProviderBuilder.java         |  0
 .../org/apache/drill/metastore/FileMetadata.java   |  0
 .../apache/drill/metastore/FileTableMetadata.java  |  0
 metastore/metastore-api/pom.xml                    | 33 ++++++++++
 .../drill/exec/expr/ExactStatisticsConstants.java  |  0
 .../exec/physical/impl/statistics/Statistic.java   | 36 ++++++-----
 .../exec/record/metadata/SchemaPathUtils.java      | 34 ----------
 .../org/apache/drill/metastore/BaseMetadata.java   |  0
 .../metastore/CollectableColumnStatisticsKind.java |  0
 .../metastore/CollectableTableStatisticsKind.java  |  0
 .../apache/drill/metastore/ColumnStatistics.java   |  0
 .../drill/metastore/ColumnStatisticsImpl.java      |  0
 .../drill/metastore/ColumnStatisticsKind.java      |  5 +-
 .../apache/drill/metastore/LocationProvider.java   |  0
 .../apache/drill/metastore/PartitionMetadata.java  |  0
 .../apache/drill/metastore/RowGroupMetadata.java   |  0
 .../org/apache/drill/metastore/StatisticsKind.java |  0
 .../org/apache/drill/metastore/TableMetadata.java  |  0
 .../drill/metastore/TableStatisticsKind.java       |  7 +-
 metastore/pom.xml                                  | 75 ++++++++++++++++++++++
 pom.xml                                            |  1 +
 40 files changed, 268 insertions(+), 110 deletions(-)

diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index c35e935..366b174 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -287,6 +287,16 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-file-metastore-plugin</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.beust</groupId>
       <artifactId>jcommander</artifactId>
       <version>1.30</version>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
index c30879d..37a8d0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.expr;
 
 import org.apache.drill.exec.expr.stat.RowsMatch;
-import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.metastore.ColumnStatisticsKind;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -76,7 +76,7 @@ public class IsPredicate<C extends Comparable<C>> extends LogicalExpressionBase
         || !stat.containsStatistic(ColumnStatisticsKind.MIN_VALUE)
         || !stat.containsStatistic(ColumnStatisticsKind.MAX_VALUE)
         || !stat.containsStatistic(ColumnStatisticsKind.NULLS_COUNT)
-        || (long) stat.getStatistic(ColumnStatisticsKind.NULLS_COUNT) == GroupScan.NO_COLUMN_STATS;
+        || (long) stat.getStatistic(ColumnStatisticsKind.NULLS_COUNT) == Statistic.NO_COLUMN_STATS;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 15e1387..a547fb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.stat.RowsMatch;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -155,13 +156,13 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     long colNulls;
     if (columnStats != null) {
       Long nulls = (Long) columnStats.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
-      colNulls = nulls != null ? nulls : GroupScan.NO_COLUMN_STATS;
+      colNulls = nulls != null ? nulls : Statistic.NO_COLUMN_STATS;
     } else {
       return 0;
     }
-    return GroupScan.NO_COLUMN_STATS == tableRowCount
-        || GroupScan.NO_COLUMN_STATS == colNulls
-        ? GroupScan.NO_COLUMN_STATS : tableRowCount - colNulls;
+    return Statistic.NO_COLUMN_STATS == tableRowCount
+        || Statistic.NO_COLUMN_STATS == colNulls
+        ? Statistic.NO_COLUMN_STATS : tableRowCount - colNulls;
   }
 
   @Override
@@ -363,7 +364,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     GroupScanWithMetadataFilterer prunedMetadata = getFilterer();
     if (getTableMetadata() != null) {
       long tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
-      if (tableRowCount == NO_COLUMN_STATS || tableRowCount <= maxRecords) {
+      if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
         logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
             tableRowCount, maxRecords);
         return null;
@@ -428,7 +429,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     int currentRowCount = 0;
     for (T metadata : metadataList) {
       long rowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(metadata);
-      if (rowCount == NO_COLUMN_STATS) {
+      if (rowCount == Statistic.NO_COLUMN_STATS) {
         return null;
       } else if (currentRowCount + rowCount <= maxRecords) {
         currentRowCount += rowCount;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index ce74da1..cd49a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -46,8 +46,6 @@ public interface GroupScan extends Scan, HasAffinity {
    */
   List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);
 
-  long NO_COLUMN_STATS = -1;
-
   void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
 
   SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
index 98c0c19..1efad36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
@@ -124,7 +124,7 @@ public class SimpleFileTableMetadataProvider implements TableMetadataProvider {
 
     @Override
     @SuppressWarnings("unchecked")
-    public TableMetadataProvider build() throws IOException {
+    public TableMetadataProvider build() {
       SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
       TableMetadataProvider source = metadataProviderManager.getTableMetadataProvider();
       if (source == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/AbstractMergedStatistic.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/AbstractMergedStatistic.java
index a4ee74f..4e9a762 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/AbstractMergedStatistic.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/AbstractMergedStatistic.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.statistics;
 
 import org.apache.drill.exec.vector.complex.MapVector;
 
-public abstract class AbstractMergedStatistic extends Statistic implements MergedStatistic {
+public abstract class AbstractMergedStatistic implements MergedStatistic, Statistic {
   protected String name;
   protected String inputName;
   protected double samplePercent;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
index fd84e20..7375499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ConvertCountToDirectScanRule.java
@@ -31,8 +31,8 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.exec.planner.common.CountToDirectScanUtils;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 
@@ -288,7 +288,7 @@ public class ConvertCountToDirectScanRule extends RelOptRule {
 
           Metadata_V4.ColumnTypeMetadata_v4 columnMetadata = metadataSummary.getColumnTypeInfo(new Metadata_V4.ColumnTypeMetadata_v4.Key(simplePath));
 
-         if (columnMetadata == null || columnMetadata.totalNullCount == GroupScan.NO_COLUMN_STATS) {
+         if (columnMetadata == null || columnMetadata.totalNullCount == Statistic.NO_COLUMN_STATS) {
             // if column stats is not available don't apply this rule, return empty counts
             return ImmutableMap.of();
           } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
index 4176950..8d7022e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScanPrule.java
@@ -33,6 +33,7 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.exec.planner.logical.DrillAggregateRel;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -202,7 +203,7 @@ public class ConvertCountToDirectScanPrule extends Prule {
           }
 
           cnt = oldGrpScan.getColumnValueCount(simplePath);
-          if (cnt == GroupScan.NO_COLUMN_STATS) {
+          if (cnt == Statistic.NO_COLUMN_STATS) {
             // if column stats is not available don't apply this rule, return empty counts
             return ImmutableMap.of();
           }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
index bf1f9af..f9c05f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MapColumnMetadata.java
@@ -64,8 +64,7 @@ public class MapColumnMetadata extends AbstractColumnMetadata {
     mapSchema = (TupleSchema) from.mapSchema.copy();
   }
 
-  public MapColumnMetadata(String name, DataMode mode,
-      TupleSchema mapSchema) {
+  public MapColumnMetadata(String name, DataMode mode, TupleSchema mapSchema) {
     super(name, MinorType.MAP, mode);
     if (mapSchema == null) {
       this.mapSchema = new TupleSchema();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
index 21577c0..f1cf45a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/MetadataUtils.java
@@ -19,6 +19,10 @@ package org.apache.drill.exec.record.metadata;
 
 import java.util.List;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -203,4 +207,37 @@ public class MetadataUtils {
         .build();
     return new PrimitiveColumnMetadata(field);
   }
+
+  /**
+   * Adds column with specified schema path and type into specified {@code TupleMetadata schema}.
+   *
+   * @param schema     tuple schema where column should be added
+   * @param schemaPath schema path of the column which should be added
+   * @param type       type of the column which should be added
+   */
+  public static void addColumnMetadata(TupleMetadata schema, SchemaPath schemaPath, TypeProtos.MajorType type) {
+    PathSegment.NameSegment colPath = schemaPath.getUnIndexed().getRootSegment();
+    ColumnMetadata colMetadata;
+
+    while (!colPath.isLastPath()) {
+      colMetadata = schema.metadata(colPath.getPath());
+      if (colMetadata == null) {
+        colMetadata = MetadataUtils.newMap(colPath.getPath(), null);
+        schema.addColumn(colMetadata);
+      }
+      if (!colMetadata.isMap()) {
+        throw new DrillRuntimeException(String.format("Expected map, but was %s", colMetadata.majorType()));
+      }
+
+      schema = colMetadata.mapSchema();
+      colPath = (PathSegment.NameSegment) colPath.getChild();
+    }
+
+    colMetadata = schema.metadata(colPath.getPath());
+    if (colMetadata == null) {
+      schema.addColumn(new PrimitiveColumnMetadata(MaterializedField.create(colPath.getPath(), type)));
+    } else if (!colMetadata.majorType().equals(type)) {
+      throw new DrillRuntimeException(String.format("Types mismatch: existing type: %s, new type: %s", colMetadata.majorType(), type));
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index e368bb3..34cf354 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.common.expression.ExpressionStringBuilder;
 import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
 import org.apache.drill.exec.physical.base.ParquetMetadataProvider;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.metastore.BaseMetadata;
 import org.apache.drill.metastore.LocationProvider;
 import org.apache.drill.metastore.PartitionMetadata;
@@ -330,7 +331,7 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
     maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
     if (getTableMetadata() != null) {
       long tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
-      if (tableRowCount == NO_COLUMN_STATS || tableRowCount <= maxRecords) {
+      if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
         logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
             tableRowCount, maxRecords);
         return null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
index 29126b3..1877356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
@@ -17,9 +17,10 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.ParquetMetadataProvider;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.metastore.BaseMetadata;
 import org.apache.drill.metastore.ColumnStatisticsImpl;
@@ -190,12 +191,12 @@ public abstract class BaseParquetMetadataProvider implements ParquetMetadataProv
 
       if (this.schema == null) {
         schema = new TupleSchema();
-        fields.forEach((schemaPath, majorType) -> SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType));
+        fields.forEach((schemaPath, majorType) -> MetadataUtils.addColumnMetadata(schema, schemaPath, majorType));
       } else {
         // merges specified schema with schema from table
         fields.forEach((schemaPath, majorType) -> {
           if (SchemaPathUtils.getColumnMetadata(schemaPath, schema) == null) {
-            SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType);
+            MetadataUtils.addColumnMetadata(schema, schemaPath, majorType);
           }
         });
       }
@@ -306,8 +307,8 @@ public abstract class BaseParquetMetadataProvider implements ParquetMetadataProv
             statistics.put(ColumnStatisticsKind.MIN_VALUE, partitionKey);
             statistics.put(ColumnStatisticsKind.MAX_VALUE, partitionKey);
 
-            statistics.put(ColumnStatisticsKind.NULLS_COUNT, GroupScan.NO_COLUMN_STATS);
-            statistics.put(TableStatisticsKind.ROW_COUNT, GroupScan.NO_COLUMN_STATS);
+            statistics.put(ColumnStatisticsKind.NULLS_COUNT, Statistic.NO_COLUMN_STATS);
+            statistics.put(TableStatisticsKind.ROW_COUNT, Statistic.NO_COLUMN_STATS);
             columnsStatistics.put(partitionColumn,
                 new ColumnStatisticsImpl<>(statistics,
                         ParquetTableMetadataUtils.getComparator(getParquetGroupScanStatistics().getTypeForColumn(partitionColumn).getMinorType())));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index d53c038..c53cee9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.parquet;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.SchemaPathUtils;
 import org.apache.drill.metastore.BaseMetadata;
@@ -109,10 +109,10 @@ public class ParquetGroupScanStatistics<T extends BaseMetadata & LocationProvide
           previousCount = emptyCount;
         }
         Long nullsNum = (Long) statistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
-        if (previousCount.longValue() != GroupScan.NO_COLUMN_STATS && nullsNum != null && nullsNum != GroupScan.NO_COLUMN_STATS) {
+        if (previousCount.longValue() != Statistic.NO_COLUMN_STATS && nullsNum != null && nullsNum != Statistic.NO_COLUMN_STATS) {
           previousCount.add(localRowCount - nullsNum);
         } else {
-          previousCount.setValue(GroupScan.NO_COLUMN_STATS);
+          previousCount.setValue(Statistic.NO_COLUMN_STATS);
         }
         ColumnMetadata columnMetadata = SchemaPathUtils.getColumnMetadata(schemaPath, metadata.getSchema());
         TypeProtos.MajorType majorType = columnMetadata != null ? columnMetadata.majorType() : null;
@@ -207,7 +207,7 @@ public class ParquetGroupScanStatistics<T extends BaseMetadata & LocationProvide
 
   private boolean isSingleVal(ColumnStatistics columnStatistics, long rowCount) {
     Long numNulls = (Long) columnStatistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
-    if (numNulls != null && numNulls != GroupScan.NO_COLUMN_STATS) {
+    if (numNulls != null && numNulls != Statistic.NO_COLUMN_STATS) {
       Object min = columnStatistics.getStatistic(ColumnStatisticsKind.MIN_VALUE);
       Object max = columnStatistics.getStatistic(ColumnStatisticsKind.MAX_VALUE);
       if (min != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index b8a912d..8df6585 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.record.metadata.SchemaPathUtils;
+import org.apache.drill.exec.physical.impl.statistics.Statistic;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.resolver.TypeCastRules;
@@ -155,7 +155,7 @@ public class ParquetTableMetadataUtils {
     Map<SchemaPath, TypeProtos.MajorType> columns = getRowGroupFields(tableMetadata, rowGroupMetadata);
 
     TupleSchema schema = new TupleSchema();
-    columns.forEach((schemaPath, majorType) -> SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType));
+    columns.forEach((schemaPath, majorType) -> MetadataUtils.addColumnMetadata(schema, schemaPath, majorType));
 
     return new RowGroupMetadata(
         schema, columnsStatistics, rowGroupStatistics, rowGroupMetadata.getHostAffinity(), rgIndexInFile, location);
@@ -275,7 +275,7 @@ public class ParquetTableMetadataUtils {
 
       Long nulls = column.getNulls();
       if (!column.isNumNullsSet() || nulls == null) {
-        nulls = GroupScan.NO_COLUMN_STATS;
+        nulls = Statistic.NO_COLUMN_STATS;
       }
       PrimitiveType.PrimitiveTypeName primitiveType = getPrimitiveTypeName(tableMetadata, column);
       OriginalType originalType = getOriginalType(tableMetadata, column);
@@ -309,7 +309,7 @@ public class ParquetTableMetadataUtils {
         SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
         if (!schemaPaths.contains(schemaPath)) {
           Map<StatisticsKind, Object> statistics = new HashMap<>();
-          statistics.put(ColumnStatisticsKind.NULLS_COUNT, GroupScan.NO_COLUMN_STATS);
+          statistics.put(ColumnStatisticsKind.NULLS_COUNT, Statistic.NO_COLUMN_STATS);
           PrimitiveType.PrimitiveTypeName primitiveType = columnTypeMetadata.primitiveType;
           OriginalType originalType = columnTypeMetadata.originalType;
           Comparator comparator = getComparator(primitiveType, originalType);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
index b94f135..2cad8e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet.metadata;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.hadoop.fs.Path;
 
@@ -44,10 +43,9 @@ public class MetadataPathUtils {
    */
   public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir) {
     if (!paths.isEmpty()) {
-      List<Path> absolutePaths = Lists.newArrayList();
+      List<Path> absolutePaths = new ArrayList<>();
       for (Path relativePath : paths) {
-        Path absolutePath = (relativePath.isAbsolute()) ? relativePath
-            : new Path(baseDir, relativePath);
+        Path absolutePath = (relativePath.isAbsolute()) ? relativePath : new Path(baseDir, relativePath);
         absolutePaths.add(absolutePath);
       }
       return absolutePaths;
@@ -62,9 +60,10 @@ public class MetadataPathUtils {
    * @param baseDir base parent directory
    * @return list of files with absolute paths
    */
-  public static List<? extends ParquetFileMetadata> convertToFilesWithAbsolutePaths(List<? extends ParquetFileMetadata> files, String baseDir) {
+  public static List<? extends ParquetFileMetadata> convertToFilesWithAbsolutePaths(
+      List<? extends ParquetFileMetadata> files, String baseDir) {
     if (!files.isEmpty()) {
-      List<ParquetFileMetadata> filesWithAbsolutePaths = Lists.newArrayList();
+      List<ParquetFileMetadata> filesWithAbsolutePaths = new ArrayList<>();
       for (ParquetFileMetadata file : files) {
         Path relativePath = file.getPath();
         ParquetFileMetadata fileWithAbsolutePath = null;
@@ -97,7 +96,7 @@ public class MetadataPathUtils {
     for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) {
       directoriesWithRelativePaths.add(relativize(baseDir, directory));
     }
-    List<ParquetFileMetadata_v4> filesWithRelativePaths = Lists.newArrayList();
+    List<ParquetFileMetadata_v4> filesWithRelativePaths = new ArrayList<>();
     for (ParquetFileMetadata_v4 file : (List<ParquetFileMetadata_v4>) tableMetadataWithAbsolutePaths.getFiles()) {
       filesWithRelativePaths.add(new ParquetFileMetadata_v4(
           relativize(baseDir, file.getPath()), file.length, file.rowGroups));
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 4367489..be79b41 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -347,7 +347,6 @@
             <!-- Relocate Drill classes to minimize classloader hell. -->
             <relocation><pattern>org.apache.drill.exec.</pattern><shadedPattern>oadd.org.apache.drill.exec.</shadedPattern></relocation>
             <relocation><pattern>org.apache.drill.common.</pattern><shadedPattern>oadd.org.apache.drill.common.</shadedPattern></relocation>
-            <relocation><pattern>org.apache.drill.metastore.</pattern><shadedPattern>oadd.org.apache.drill.metastore.</shadedPattern></relocation>
 
             <!-- Move dependencies out of path -->
             <relocation><pattern>antlr.</pattern><shadedPattern>oadd.antlr.</shadedPattern></relocation>
@@ -489,6 +488,7 @@
                <exclude>org/apache/commons/pool2/**</exclude>
                <exclude>org/apache/http/**</exclude>
                <exclude>org/apache/directory/**</exclude>
+               <exclude>org/apache/drill/metastore/**</exclude>
                <exclude>com/jcraft/**</exclude>
                <exclude>**/mapr/**</exclude>
                <exclude>org/yaml/**</exclude>
@@ -664,7 +664,6 @@
                   <!-- Relocate Drill classes to minimize classloader hell. -->
                   <relocation><pattern>org.apache.drill.exec.</pattern><shadedPattern>oadd.org.apache.drill.exec.</shadedPattern></relocation>
                   <relocation><pattern>org.apache.drill.common.</pattern><shadedPattern>oadd.org.apache.drill.common.</shadedPattern></relocation>
-                  <relocation><pattern>org.apache.drill.metastore.</pattern><shadedPattern>oadd.org.apache.drill.metastore.</shadedPattern></relocation>
 
                   <!-- Move dependencies out of path -->
                   <relocation><pattern>antlr.</pattern><shadedPattern>oadd.antlr.</shadedPattern></relocation>
@@ -788,6 +787,7 @@
                       <exclude>org/apache/drill/exec/rpc/data/**</exclude>
                       <exclude>org/apache/drill/exec/rpc/control/**</exclude>
                       <exclude>org/apache/drill/exec/work/**</exclude>
+                      <exclude>org/apache/drill/metastore/**</exclude>
                       <exclude>org/apache/hadoop/**</exclude>
                       <exclude>org/apache/commons/pool2/**</exclude>
                       <exclude>org/apache/http/**</exclude>
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillMetaImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillMetaImpl.java
index 2b61a6a..e06445d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillMetaImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillMetaImpl.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import javax.validation.constraints.NotNull;
 
@@ -60,9 +61,7 @@ import org.apache.drill.exec.proto.UserProtos.TableMetadata;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RpcException;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 public class DrillMetaImpl extends MetaImpl {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillMetaImpl.class);
@@ -315,18 +314,14 @@ public class DrillMetaImpl extends MetaImpl {
       }
 
       try {
-        List<Object> tables = Lists.transform(getResult(response), new Function<ResponseValue, Object>() {
-          @Override
-          public Object apply(ResponseValue input) {
-            return adapt(input);
-          }
-        });
+        List<Object> tables = getResult(response).stream()
+            .map(this::adapt)
+            .collect(Collectors.toList());
 
         Meta.Frame frame = Meta.Frame.create(0, true, tables);
         StructType fieldMetaData = drillFieldMetaData(clazz);
-        Meta.Signature signature = Meta.Signature.create(
-            fieldMetaData.columns, "",
-            Collections.<AvaticaParameter>emptyList(), CursorFactory.record(clazz), Meta.StatementType.SELECT);
+        Meta.Signature signature = Meta.Signature.create(fieldMetaData.columns, "", Collections.emptyList(),
+            CursorFactory.record(clazz), Meta.StatementType.SELECT);
 
         AvaticaStatement statement = connection.createStatement();
         return MetaResultSet.create(connection.id, statement.getId(), true,
diff --git a/metastore/file-metadata/pom.xml b/metastore/file-metadata/pom.xml
new file mode 100644
index 0000000..d0bd794
--- /dev/null
+++ b/metastore/file-metadata/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.drill.metastore</groupId>
+    <artifactId>metastore-parent</artifactId>
+    <version>1.17.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-file-metastore-plugin</artifactId>
+  <name>Drill File Metadata Metastore Plugin</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProvider.java b/metastore/file-metadata/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProvider.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProvider.java
rename to metastore/file-metadata/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProvider.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProviderBuilder.java b/metastore/file-metadata/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProviderBuilder.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProviderBuilder.java
rename to metastore/file-metadata/src/main/java/org/apache/drill/exec/physical/base/TableMetadataProviderBuilder.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileMetadata.java b/metastore/file-metadata/src/main/java/org/apache/drill/metastore/FileMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/FileMetadata.java
rename to metastore/file-metadata/src/main/java/org/apache/drill/metastore/FileMetadata.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileTableMetadata.java b/metastore/file-metadata/src/main/java/org/apache/drill/metastore/FileTableMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/FileTableMetadata.java
rename to metastore/file-metadata/src/main/java/org/apache/drill/metastore/FileTableMetadata.java
diff --git a/metastore/metastore-api/pom.xml b/metastore/metastore-api/pom.xml
new file mode 100644
index 0000000..d3f3aed
--- /dev/null
+++ b/metastore/metastore-api/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.drill.metastore</groupId>
+    <artifactId>metastore-parent</artifactId>
+    <version>1.17.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-metastore-api</artifactId>
+  <name>Drill Metastore API</name>
+
+</project>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExactStatisticsConstants.java b/metastore/metastore-api/src/main/java/org/apache/drill/exec/expr/ExactStatisticsConstants.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExactStatisticsConstants.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/exec/expr/ExactStatisticsConstants.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java b/metastore/metastore-api/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
similarity index 54%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
index 8f5392a..5794a13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
@@ -20,27 +20,29 @@ package org.apache.drill.exec.physical.impl.statistics;
 /*
  * Base Statistics class - all statistics classes should extend this class
  */
-public abstract class Statistic {
+public interface Statistic {
   /*
    * The lifecycle states for statistics
    */
-  public enum State {INIT, CONFIG, MERGE, COMPLETE};
+  enum State {INIT, CONFIG, MERGE, COMPLETE}
+
+  long NO_COLUMN_STATS = -1;
   /*
    * List of statistics used in Drill.
    */
-  public static final String COLNAME = "column";
-  public static final String COLTYPE = "majortype";
-  public static final String SCHEMA = "schema";
-  public static final String COMPUTED = "computed";
-  public static final String ROWCOUNT = "rowcount";
-  public static final String NNROWCOUNT = "nonnullrowcount";
-  public static final String NDV = "approx_count_distinct";
-  public static final String HLL_MERGE = "hll_merge";
-  public static final String HLL = "hll";
-  public static final String AVG_WIDTH = "avg_width";
-  public static final String SUM_WIDTH = "sum_width";
-  public static final String CNT_DUPS = "approx_count_dups";
-  public static final String SUM_DUPS = "sum";
-  public static final String TDIGEST = "tdigest";
-  public static final String TDIGEST_MERGE = "tdigest_merge";
+  String COLNAME = "column";
+  String COLTYPE = "majortype";
+  String SCHEMA = "schema";
+  String COMPUTED = "computed";
+  String ROWCOUNT = "rowcount";
+  String NNROWCOUNT = "nonnullrowcount";
+  String NDV = "approx_count_distinct";
+  String HLL_MERGE = "hll_merge";
+  String HLL = "hll";
+  String AVG_WIDTH = "avg_width";
+  String SUM_WIDTH = "sum_width";
+  String CNT_DUPS = "approx_count_dups";
+  String SUM_DUPS = "sum";
+  String TDIGEST = "tdigest";
+  String TDIGEST_MERGE = "tdigest_merge";
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java b/metastore/metastore-api/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java
similarity index 55%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java
index 085c68c..5c39701 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/exec/record/metadata/SchemaPathUtils.java
@@ -17,11 +17,8 @@
  */
 package org.apache.drill.exec.record.metadata;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.record.MaterializedField;
 
 public class SchemaPathUtils {
 
@@ -50,36 +47,5 @@ public class SchemaPathUtils {
     return colMetadata;
   }
 
-  /**
-   * Adds column with specified schema path and type into specified {@code TupleMetadata schema}.
-   *
-   * @param schema     tuple schema where column should be added
-   * @param schemaPath schema path of the column which should be added
-   * @param type       type of the column which should be added
-   */
-  public static void addColumnMetadata(TupleMetadata schema, SchemaPath schemaPath, TypeProtos.MajorType type) {
-    PathSegment.NameSegment colPath = schemaPath.getUnIndexed().getRootSegment();
-    ColumnMetadata colMetadata;
 
-    while (!colPath.isLastPath()) {
-      colMetadata = schema.metadata(colPath.getPath());
-      if (colMetadata == null) {
-        colMetadata = MetadataUtils.newMap(colPath.getPath(), null);
-        schema.addColumn(colMetadata);
-      }
-      if (!colMetadata.isMap()) {
-        throw new DrillRuntimeException(String.format("Expected map, but was %s", colMetadata.majorType()));
-      }
-
-      schema = colMetadata.mapSchema();
-      colPath = (PathSegment.NameSegment) colPath.getChild();
-    }
-
-    colMetadata = schema.metadata(colPath.getPath());
-    if (colMetadata == null) {
-      schema.addColumn(new PrimitiveColumnMetadata(MaterializedField.create(colPath.getPath(), type)));
-    } else if (!colMetadata.majorType().equals(type)) {
-      throw new DrillRuntimeException(String.format("Types mismatch: existing type: %s, new type: %s", colMetadata.majorType(), type));
-    }
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/BaseMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/BaseMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/BaseMetadata.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/BaseMetadata.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/CollectableColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/CollectableColumnStatisticsKind.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/CollectableColumnStatisticsKind.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/CollectableColumnStatisticsKind.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/CollectableTableStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/CollectableTableStatisticsKind.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/CollectableTableStatisticsKind.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/CollectableTableStatisticsKind.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatistics.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatistics.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatistics.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatistics.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatisticsImpl.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsImpl.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatisticsImpl.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsImpl.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java
similarity index 96%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java
index e94b842..8ee63ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/ColumnStatisticsKind.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.metastore;
 
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.expr.ExactStatisticsConstants;
 import org.apache.drill.exec.physical.impl.statistics.Statistic;
 
@@ -38,8 +37,8 @@ public enum ColumnStatisticsKind implements CollectableColumnStatisticsKind {
       long nullsCount = 0;
       for (ColumnStatistics statistics : statisticsList) {
         Long statNullsCount = (Long) statistics.getStatistic(this);
-        if (statNullsCount == null || statNullsCount == GroupScan.NO_COLUMN_STATS) {
-          return GroupScan.NO_COLUMN_STATS;
+        if (statNullsCount == null || statNullsCount == Statistic.NO_COLUMN_STATS) {
+          return Statistic.NO_COLUMN_STATS;
         } else {
           nullsCount += statNullsCount;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/LocationProvider.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/LocationProvider.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/LocationProvider.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/LocationProvider.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/PartitionMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/PartitionMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/PartitionMetadata.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/PartitionMetadata.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/RowGroupMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/RowGroupMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/RowGroupMetadata.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/RowGroupMetadata.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/StatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/StatisticsKind.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/StatisticsKind.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/StatisticsKind.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/TableMetadata.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/TableMetadata.java
similarity index 100%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/TableMetadata.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/TableMetadata.java
diff --git a/exec/java-exec/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java
similarity index 94%
rename from exec/java-exec/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java
rename to metastore/metastore-api/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java
index 63b9243..1c10938 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/TableStatisticsKind.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.metastore;
 
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.expr.ExactStatisticsConstants;
 import org.apache.drill.exec.physical.impl.statistics.Statistic;
 
@@ -37,8 +36,8 @@ public enum TableStatisticsKind implements CollectableTableStatisticsKind {
       long rowCount = 0;
       for (BaseMetadata statistic : statistics) {
         Long statRowCount = getValue(statistic);
-        if (statRowCount == null || statRowCount == GroupScan.NO_COLUMN_STATS) {
-          rowCount = GroupScan.NO_COLUMN_STATS;
+        if (statRowCount == null || statRowCount == Statistic.NO_COLUMN_STATS) {
+          rowCount = Statistic.NO_COLUMN_STATS;
           break;
         } else {
           rowCount += statRowCount;
@@ -50,7 +49,7 @@ public enum TableStatisticsKind implements CollectableTableStatisticsKind {
     @Override
     public Long getValue(BaseMetadata metadata) {
       Long rowCount = (Long) metadata.getStatistic(this);
-      return rowCount != null ? rowCount : GroupScan.NO_COLUMN_STATS;
+      return rowCount != null ? rowCount : Statistic.NO_COLUMN_STATS;
     }
 
     @Override
diff --git a/metastore/pom.xml b/metastore/pom.xml
new file mode 100644
index 0000000..2558e11
--- /dev/null
+++ b/metastore/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.drill</groupId>
+    <artifactId>drill-root</artifactId>
+    <version>1.17.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.drill.metastore</groupId>
+  <artifactId>metastore-parent</artifactId>
+  <packaging>pom</packaging>
+  <name>metastore/Parent Pom</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-logical</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>vector</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <modules>
+    <module>metastore-api</module>
+    <module>file-metadata</module>
+  </modules>
+</project>
diff --git a/pom.xml b/pom.xml
index ac9af89..2080964 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3586,5 +3586,6 @@
     <module>exec</module>
     <module>drill-yarn</module>
     <module>distribution</module>
+    <module>metastore</module>
   </modules>
 </project>


[drill] 06/08: DRILL-7225: Fixed merging ColumnTypeInfo for files with different schemas closes #1773

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5862a39468773bba8aed51cc740b0ed9a8909fb4
Author: Venkata Jyothsna Donapati <jy...@gmail.com>
AuthorDate: Mon Apr 29 12:27:44 2019 -0700

    DRILL-7225: Fixed merging ColumnTypeInfo for files with different schemas
    closes #1773
---
 .../drill/exec/store/parquet/metadata/Metadata.java     |  4 ++++
 .../exec/store/parquet/TestParquetMetadataCache.java    | 17 +++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 5459a8a..b239be7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -321,6 +321,10 @@ public class Metadata {
         Map<ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = parquetFileAndRowCountMetadata.getTotalNullCountMap();
         for (ColumnTypeMetadata_v4.Key column: totalNullCountMap.keySet()) {
           ColumnTypeMetadata_v4 columnTypeMetadata_v4 = columnTypeInfoSet.get(column);
+          // If the column is not present in columnTypeInfoSet, get it from parquetTableMetadata
+          if (columnTypeMetadata_v4 == null) {
+            columnTypeMetadata_v4 = parquetTableMetadata.getColumnTypeInfoMap().get(column);
+          }
           // If the existing total null count or the null count of the child file is unknown(-1), update the total null count
           // as unknown
           if ( columnTypeMetadata_v4.totalNullCount < 0 || totalNullCountMap.get(column) < 0) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 8280061..5799299 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -1385,4 +1385,21 @@ public class TestParquetMetadataCache extends PlanTestBase {
         .go();
   }
 
+  @Test
+  public void testRefreshSchemaChange() throws Exception {
+    String tableName = "orders_nation_ctas";
+    test("use dfs");
+
+    test("create table `%s/t1` as select * from cp.`tpch/orders.parquet`", tableName);
+    test("create table `%s/t2` as select * from cp.`tpch/nation.parquet`", tableName);
+    String query = String.format("refresh table metadata %s", tableName);
+
+    testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("ok", "summary")
+            .baselineValues(true, "Successfully updated metadata for table orders_nation_ctas.")
+            .go();
+    checkForMetadataFile(tableName);
+  }
 }


[drill] 02/08: DRILL-7167: Implemented DESCRIBE TABLE statement

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 878cfc033de4343e23f1422bb8845d5c6d892618
Author: Dmytriy Grinchenko <dm...@gmail.com>
AuthorDate: Fri Apr 12 10:24:17 2019 +0300

    DRILL-7167: Implemented DESCRIBE TABLE statement
    
    - altered parser implementation to honor DESCRIBE TABLE syntax
    - extended test coverage to check the new statement
    closes #1747
---
 .../src/main/codegen/includes/parserImpls.ftl      |  3 +-
 .../org/apache/drill/exec/sql/TestInfoSchema.java  | 62 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 9ba92f2..25ee72b 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -106,7 +106,7 @@ SqlNode SqlShowSchemas() :
 
 /**
  * Parses statement
- *   { DESCRIBE | DESC } tblname [col_name | wildcard ]
+ *   { DESCRIBE | DESC } [TABLE] tblname [col_name | wildcard ]
  */
 SqlNode SqlDescribeTable() :
 {
@@ -117,6 +117,7 @@ SqlNode SqlDescribeTable() :
 }
 {
     (<DESCRIBE> | <DESC>) { pos = getPos(); }
+    (<TABLE>)?
     table = CompoundIdentifier()
     (
         column = CompoundIdentifier()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index ee89c5c..f437776 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -184,6 +184,16 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
+  public void describeTableWithTableKeyword() throws Exception {
+    test("USE INFORMATION_SCHEMA");
+    testBuilder()
+        .sqlQuery("DESCRIBE TABLE CATALOGS")
+        .unOrdered()
+        .sqlBaselineQuery("DESCRIBE CATALOGS")
+        .go();
+  }
+
+  @Test
   public void describeTableWithSchema() throws Exception{
     testBuilder()
         .sqlQuery("DESCRIBE INFORMATION_SCHEMA.`TABLES`")
@@ -197,6 +207,15 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
+  public void describeTableWithSchemaAndTableKeyword() throws Exception {
+    testBuilder()
+        .sqlQuery("DESCRIBE TABLE INFORMATION_SCHEMA.`TABLES`")
+        .unOrdered()
+        .sqlBaselineQuery("DESCRIBE INFORMATION_SCHEMA.`TABLES`")
+        .go();
+  }
+
+  @Test
   public void describeWhenSameTableNameExistsInMultipleSchemas() throws Exception{
     try {
       test("USE dfs.tmp");
@@ -220,7 +239,29 @@ public class TestInfoSchema extends BaseTestQuery {
           .baselineValues("TABLE_TYPE", "CHARACTER VARYING", "NO")
           .go();
     } finally {
-      test("DROP VIEW dfs.tmp.`TABLES`");
+      test("DROP VIEW IF EXISTS dfs.tmp.`TABLES`");
+    }
+  }
+
+  @Test
+  public void describeWhenSameTableNameExistsInMultipleSchemasWithTableKeyword() throws Exception {
+    try {
+      test("USE dfs.tmp");
+      test("CREATE OR REPLACE VIEW `TABLES` AS SELECT full_name FROM cp.`employee.json`");
+
+      testBuilder()
+          .sqlQuery("DESCRIBE TABLE `TABLES`")
+          .unOrdered()
+          .sqlBaselineQuery("DESCRIBE `TABLES`")
+          .go();
+
+      testBuilder()
+          .sqlQuery("DESCRIBE TABLE INFORMATION_SCHEMA.`TABLES`")
+          .unOrdered()
+          .sqlBaselineQuery("DESCRIBE INFORMATION_SCHEMA.`TABLES`")
+          .go();
+    } finally {
+      test("DROP VIEW IF EXISTS dfs.tmp.`TABLES`");
     }
   }
 
@@ -236,6 +277,16 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
+  public void describeTableWithColumnNameAndTableKeyword() throws Exception {
+    test("USE INFORMATION_SCHEMA");
+    testBuilder()
+        .sqlQuery("DESCRIBE TABLE `TABLES` TABLE_CATALOG")
+        .unOrdered()
+        .sqlBaselineQuery("DESCRIBE `TABLES` TABLE_CATALOG")
+        .go();
+  }
+
+  @Test
   public void describeTableWithSchemaAndColumnName() throws Exception{
     testBuilder()
         .sqlQuery("DESCRIBE INFORMATION_SCHEMA.`TABLES` TABLE_CATALOG")
@@ -246,6 +297,15 @@ public class TestInfoSchema extends BaseTestQuery {
   }
 
   @Test
+  public void describeTableWithSchemaAndColumnNameAndTableKeyword() throws Exception {
+    testBuilder()
+        .sqlQuery("DESCRIBE TABLE INFORMATION_SCHEMA.`TABLES` TABLE_CATALOG")
+        .unOrdered()
+        .sqlBaselineQuery("DESCRIBE INFORMATION_SCHEMA.`TABLES` TABLE_CATALOG")
+        .go();
+  }
+
+  @Test
   public void describeTableWithColQualifier() throws Exception{
     testBuilder()
         .sqlQuery("DESCRIBE COLUMNS 'TABLE%'")


[drill] 05/08: DRILL-6974: SET option command modification

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit caa983105b0c6ea92770f0b86d7e28d457cb0bae
Author: Dmytriy Grinchenko <dm...@gmail.com>
AuthorDate: Wed Apr 17 12:23:34 2019 +0300

    DRILL-6974: SET option command modification
    
    - ALTER ... RESET ... and ALTER ... SET ... sub-parsers separated to 2
      different SqlCall classes with same parent SqlSetOption
    - parserImpls modified to handle new syntax of ALTER... SET...
      expresion:
      a) ALTER ... SET option.name - option.value - setting option value
      b) ALTER ... SET option.name - display option value
    - Handler for SqlSetOption separated to SetOptionHandler and
      ResetOptionhandler for better representation of handled statements
    - Base abstract class AbstractSqlSetHandler created to not repeat
      shared implementation of same functions
    - SetOptionHandler covered with unit tests for each statement
      form.
    
    Fix issues stated in the review
    closes #1763
---
 exec/java-exec/src/main/codegen/data/Parser.tdd    |   6 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  63 +++++++++++
 .../drill/exec/planner/sql/DrillSqlWorker.java     |  22 ++--
 .../sql/handlers/AbstractSqlSetHandler.java        |  86 +++++++++++++++
 .../planner/sql/handlers/ResetOptionHandler.java   |  73 +++++++++++++
 .../planner/sql/handlers/SetOptionHandler.java     | 119 ++++++++-------------
 .../sql/parser/CompoundIdentifierConverter.java    |   4 +-
 .../planner/sql/parser/DrillSqlResetOption.java    | 103 ++++++++++++++++++
 .../exec/planner/sql/parser/DrillSqlSetOption.java |  75 +++++++++++++
 .../sql/handlers/ResetOptionHandlerTest.java       |  64 +++++++++++
 .../planner/sql/handlers/SetOptionHandlerTest.java |  90 ++++++++++++++++
 11 files changed, 623 insertions(+), 82 deletions(-)

diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd
index 3580388..df97e24 100644
--- a/exec/java-exec/src/main/codegen/data/Parser.tdd
+++ b/exec/java-exec/src/main/codegen/data/Parser.tdd
@@ -59,7 +59,9 @@
     "SqlRefreshMetadata()",
     "SqlCreateFunction()",
     "SqlDropFunction()",
-    "SqlAnalyzeTable()"
+    "SqlAnalyzeTable()",
+    "DrillSqlSetOption(Span.of(), null)",
+    "DrillSqlResetOption(Span.of(), null)"
   ]
 
   # List of methods for parsing custom literals.
@@ -87,6 +89,8 @@
   # List of methods for parsing extensions to "ALTER <scope>" calls.
   # Each must accept arguments "(SqlParserPos pos, String scope)".
   alterStatementParserMethods: [
+    "DrillSqlSetOption",
+    "DrillSqlResetOption"
   ]
 
   # List of methods for parsing extensions to "DROP" calls.
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 25ee72b..f90859a 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -634,3 +634,66 @@ SqlNode SqlAnalyzeTable() :
         return new SqlAnalyzeTable(pos, tblName, estimate, fieldList, percent);
     }
 }
+
+
+/**
+ * Parses a SET statement without a leading "ALTER <SCOPE>":
+ *
+ * SET &lt;NAME&gt; [ = VALUE ]
+ * <p>
+ * Statement handles in: {@link SetAndResetOptionHandler}
+ */
+DrillSqlSetOption DrillSqlSetOption(Span s, String scope) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlNode val = null;
+}
+{
+    <SET> {
+        s.add(this);
+    }
+    name = CompoundIdentifier()
+    (
+        <EQ>
+        (
+            val = Literal()
+        |
+            val = SimpleIdentifier()
+        )
+    )?
+    {
+      pos = (val == null) ? s.end(name) : s.end(val);
+
+      return new DrillSqlSetOption(pos, scope, name, val);
+    }
+}
+
+/**
+ * Parses a RESET statement without a leading "ALTER <SCOPE>":
+ *
+ *  RESET { <NAME> | ALL }
+ * <p>
+ * Statement handles in: {@link SetAndResetOptionHandler}
+ */
+DrillSqlResetOption DrillSqlResetOption(Span s, String scope) :
+{
+    SqlIdentifier name;
+}
+{
+    <RESET> {
+        s.add(this);
+    }
+    (
+        name = CompoundIdentifier()
+    |
+        <ALL> {
+            name = new SqlIdentifier(token.image.toUpperCase(Locale.ROOT), getPos());
+        }
+    )
+    {
+        return new DrillSqlResetOption(s.end(name), scope, name);
+    }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 09fbbdc..5ce9c19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -41,11 +41,14 @@ import org.apache.drill.exec.planner.sql.handlers.DescribeSchemaHandler;
 import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
 import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
 import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
+import org.apache.drill.exec.planner.sql.handlers.ResetOptionHandler;
 import org.apache.drill.exec.planner.sql.handlers.SchemaHandler;
 import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
 import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
+import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
+import org.apache.drill.exec.planner.sql.parser.DrillSqlSetOption;
 import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
 import org.apache.drill.exec.planner.sql.parser.SqlSchema;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -150,15 +153,22 @@ public class DrillSqlWorker {
     final AbstractSqlHandler handler;
     final SqlHandlerConfig config = new SqlHandlerConfig(context, parser);
 
-    switch(sqlNode.getKind()) {
+    switch (sqlNode.getKind()) {
       case EXPLAIN:
         handler = new ExplainHandler(config, textPlan);
         context.setSQLStatementType(SqlStatementType.EXPLAIN);
         break;
       case SET_OPTION:
-        handler = new SetOptionHandler(context);
-        context.setSQLStatementType(SqlStatementType.SETOPTION);
-        break;
+        if (sqlNode instanceof DrillSqlSetOption) {
+          handler = new SetOptionHandler(context);
+          context.setSQLStatementType(SqlStatementType.SETOPTION);
+          break;
+        }
+        if (sqlNode instanceof DrillSqlResetOption) {
+          handler = new ResetOptionHandler(context);
+          context.setSQLStatementType(SqlStatementType.SETOPTION);
+          break;
+        }
       case DESCRIBE_TABLE:
         if (sqlNode instanceof DrillSqlDescribeTable) {
           handler = new DescribeTableHandler(config);
@@ -184,8 +194,8 @@ public class DrillSqlWorker {
       case DROP_VIEW:
       case OTHER_DDL:
       case OTHER:
-        if(sqlNode instanceof SqlCreateTable) {
-          handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan);
+        if (sqlNode instanceof SqlCreateTable) {
+          handler = ((DrillSqlCall) sqlNode).getSqlHandler(config, textPlan);
           context.setSQLStatementType(SqlStatementType.CTAS);
           break;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlSetHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlSetHandler.java
new file mode 100644
index 0000000..71d7636
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AbstractSqlSetHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.calcite.sql.SqlSetOption;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.QueryOptionManager;
+import org.apache.drill.exec.util.ImpersonationUtil;
+
+/**
+ * Base handler for SQL_SET kind statements.
+ */
+abstract class AbstractSqlSetHandler extends AbstractSqlHandler {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSqlHandler.class);
+
+  final QueryContext context;
+
+  AbstractSqlSetHandler(QueryContext context) {
+    this.context = context;
+  }
+
+  /**
+   * Extracts query {@link OptionValue.OptionScope} from the {@link SqlSetOption}.
+   * @param statement Statement object
+   * @param options Options object
+   * @return parsed query scope
+   */
+  OptionValue.OptionScope getScope(SqlSetOption statement, QueryOptionManager options) {
+    String scope = statement.getScope();
+
+    if (scope == null) {
+      return OptionValue.OptionScope.SESSION;
+    }
+
+    switch (scope.toLowerCase()) {
+      case "session":
+        if (options.getBoolean(ExecConstants.SKIP_ALTER_SESSION_QUERY_PROFILE)) {
+          logger.debug("Will not write profile for ALTER SESSION SET ... ");
+          context.skipWritingProfile(true);
+        }
+        return OptionValue.OptionScope.SESSION;
+      case "system":
+        return OptionValue.OptionScope.SYSTEM;
+      default:
+        throw UserException.validationError()
+            .message("Invalid OPTION scope %s. Scope must be SESSION or SYSTEM.", scope)
+            .build(logger);
+    }
+  }
+
+  /**
+   * Admin privileges checker.
+   * @param options Options object
+   */
+  void checkAdminPrivileges(QueryOptionManager options) {
+    if (context.isUserAuthenticationEnabled()
+        && !ImpersonationUtil.hasAdminPrivileges(
+            context.getQueryUserName(),
+            ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
+            ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
+
+      throw UserException
+          .permissionError()
+          .message("Not authorized to change SYSTEM options.")
+          .build(logger);
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandler.java
new file mode 100644
index 0000000..8f99192
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionScope;
+import org.apache.drill.exec.server.options.QueryOptionManager;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSetOption;
+
+/**
+ * Converts a {@link SqlNode} representing: "ALTER .. RESET option | ALL" statement to a {@link PhysicalPlan}.
+ * See {@link DrillSqlResetOption}.
+ * <p>
+ * These statements have side effects i.e. the options within the system context or the session context are modified.
+ * The resulting {@link DirectPlan} returns to the client a string that is the name of the option that was updated
+ * or a value of the property
+ */
+public class ResetOptionHandler extends AbstractSqlSetHandler {
+
+  /**
+   * Class constructor.
+   * @param context Context of the Query
+   */
+  public ResetOptionHandler(QueryContext context) {
+    super(context);
+  }
+
+  /**
+   * Handles {@link DrillSqlResetOption} query
+   */
+  @Override
+  public final PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
+    QueryOptionManager options = context.getOptions();
+    SqlSetOption statement = unwrap(sqlNode, SqlSetOption.class);
+    OptionScope optionScope = getScope(statement, context.getOptions());
+
+    if (optionScope == OptionValue.OptionScope.SYSTEM) {
+      checkAdminPrivileges(options);
+    }
+
+    OptionManager optionManager = options.getOptionManager(optionScope);
+    String optionName = statement.getName().toString();
+
+    if ("ALL".equalsIgnoreCase(optionName)) {
+      optionManager.deleteAllLocalOptions();
+    } else {
+      optionManager.deleteLocalOption(optionName);
+    }
+    return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", optionName));
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index 8a6a30c..41a1b78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -20,108 +20,68 @@ package org.apache.drill.exec.planner.sql.handlers;
 import java.math.BigDecimal;
 
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.ValidationException;
-
 import org.apache.calcite.util.NlsString;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.parser.DrillSqlSetOption;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionScope;
-import org.apache.drill.exec.server.options.QueryOptionManager;
-import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlSetOption;
 
 /**
- * Converts a {@link SqlNode} representing "ALTER .. SET option = value" and "ALTER ... RESET ..." statements to a
- * {@link PhysicalPlan}. See {@link SqlSetOption}. These statements have side effects i.e. the options within the
- * system context or the session context are modified. The resulting {@link DirectPlan} returns to the client a string
- * that is the name of the option that was updated.
+ * Converts a {@link SqlNode} representing: "ALTER .. SET option = value" or "ALTER ... SET option"
+ * statement to a {@link PhysicalPlan}. See {@link DrillSqlSetOption}
+ * <p>
+ * These statements have side effects i.e. the options within the system context or the session context are modified.
+ * The resulting {@link DirectPlan} returns to the client a string that is the name of the option that was updated
+ * or a value of the property
  */
-public class SetOptionHandler extends AbstractSqlHandler {
+public class SetOptionHandler extends AbstractSqlSetHandler {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
 
-  private final QueryContext context;
-
   public SetOptionHandler(QueryContext context) {
-    this.context = context;
+    super(context);
   }
 
+  /**
+   * Handles {@link DrillSqlSetOption} query
+   */
   @Override
-  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, ForemanSetupException {
-    final SqlSetOption option = unwrap(sqlNode, SqlSetOption.class);
-    final SqlNode value = option.getValue();
-    if (value != null && !(value instanceof SqlLiteral)) {
-      throw UserException.validationError()
-          .message("Drill does not support assigning non-literal values in SET statements.")
-          .build(logger);
-    }
+  public final PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
+    // sqlNode could contain DrillSqlResetOption or DrillSqlSetOption, depends on parsed statement
+    SqlSetOption statement = unwrap(sqlNode, SqlSetOption.class);
+    OptionScope optionScope = getScope(statement, context.getOptions());
+    OptionManager optionManager = context.getOptions().getOptionManager(optionScope);
+
+    String optionName = statement.getName().toString();
+    SqlNode optionValue = statement.getValue();
+
+    if (optionValue == null) {
+      String value = String.valueOf(optionManager.getOption(optionName).getValue());
 
-    final QueryOptionManager options = context.getOptions();
-    final String scope = option.getScope();
-    final OptionValue.OptionScope optionScope;
-    if (scope == null) { // No scope mentioned assumed SESSION
-      optionScope = OptionScope.SESSION;
+      return DirectPlan.createDirectPlan(context, new SetOptionViewResult(optionName, value));
     } else {
-      switch (scope.toLowerCase()) {
-      case "session":
-        optionScope = OptionScope.SESSION;
-        // Skip writing profiles for "ALTER SESSION SET" queries
-        if (options.getBoolean(ExecConstants.SKIP_ALTER_SESSION_QUERY_PROFILE)) {
-          logger.debug("Will not write profile for ALTER SESSION SET ... ");
-          context.skipWritingProfile(true);
-        }
-        break;
-      case "system":
-        optionScope = OptionScope.SYSTEM;
-        break;
-      default:
+      if (optionScope == OptionValue.OptionScope.SYSTEM) {
+        checkAdminPrivileges(context.getOptions());
+      }
+      if (!(optionValue instanceof SqlLiteral)) {
         throw UserException.validationError()
-            .message("Invalid OPTION scope %s. Scope must be SESSION or SYSTEM.", scope)
+            .message("Drill does not support assigning non-literal values in SET statements.")
             .build(logger);
       }
-    }
+      optionManager.setLocalOption(optionName, sqlLiteralToObject((SqlLiteral) optionValue));
 
-    if (optionScope == OptionScope.SYSTEM) {
-      // If the user authentication is enabled, make sure the user who is trying to change the system option has
-      // administrative privileges.
-      if (context.isUserAuthenticationEnabled() &&
-          !ImpersonationUtil.hasAdminPrivileges(
-            context.getQueryUserName(),
-            ExecConstants.ADMIN_USERS_VALIDATOR.getAdminUsers(options),
-            ExecConstants.ADMIN_USER_GROUPS_VALIDATOR.getAdminUserGroups(options))) {
-        throw UserException.permissionError()
-            .message("Not authorized to change SYSTEM options.")
-            .build(logger);
-      }
+      return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", optionName));
     }
-
-    final String optionName = option.getName().toString();
-
-    // Currently, we convert multi-part identifier to a string.
-    final OptionManager chosenOptions = options.getOptionManager(optionScope);
-
-    if (value != null) { // SET option
-      final Object literalObj = sqlLiteralToObject((SqlLiteral) value);
-      chosenOptions.setLocalOption(optionName, literalObj);
-    } else { // RESET option
-      if ("ALL".equalsIgnoreCase(optionName)) {
-        chosenOptions.deleteAllLocalOptions();
-      } else {
-        chosenOptions.deleteLocalOption(optionName);
-      }
-    }
-
-    return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", optionName));
   }
 
-  private static Object sqlLiteralToObject(final SqlLiteral literal) {
+  private static Object sqlLiteralToObject(SqlLiteral literal) {
     final Object object = literal.getValue();
     final SqlTypeName typeName = literal.getTypeName();
     switch (typeName) {
@@ -147,7 +107,7 @@ public class SetOptionHandler extends AbstractSqlHandler {
     case VARBINARY:
     case VARCHAR:
     case CHAR:
-      return ((NlsString) object).getValue().toString();
+      return ((NlsString) object).getValue();
 
     case BOOLEAN:
       return object;
@@ -158,4 +118,17 @@ public class SetOptionHandler extends AbstractSqlHandler {
         .build(logger);
     }
   }
+
+  /**
+   * Representation of "SET property.name" query result.
+   */
+  public static class SetOptionViewResult {
+    public String name;
+    public String value;
+
+    SetOptionViewResult(String name, String value) {
+      this.name = name;
+      this.value = value;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index 119b27d..ac0d163 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -27,7 +27,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOrderBy;
 import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.SqlSetOption;
 import org.apache.calcite.sql.util.SqlShuttle;
 import org.apache.calcite.sql.util.SqlVisitor;
 
@@ -77,7 +76,8 @@ public class CompoundIdentifierConverter extends SqlShuttle {
         .put(SqlOrderBy.class, arrayOf(D, E, D, D))
         .put(SqlDropTable.class, arrayOf(D, D))
         .put(SqlRefreshMetadata.class, arrayOf(D, D, E))
-        .put(SqlSetOption.class, arrayOf(D, D, D))
+        .put(DrillSqlSetOption.class, arrayOf(D, D, D))
+        .put(DrillSqlResetOption.class, arrayOf(D, D))
         .put(SqlCreateFunction.class, arrayOf(D))
         .put(SqlDropFunction.class, arrayOf(D))
         .put(SqlSchema.Create.class, arrayOf(D, D, D, D, D, D))
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlResetOption.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlResetOption.java
new file mode 100644
index 0000000..18c199c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlResetOption.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSetOption;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.drill.exec.planner.sql.handlers.ResetOptionHandler;
+
+/**
+ * Sql parse tree node to represent statement: {@code RESET { <NAME> | ALL } }.
+ * Statement handled in: {@link ResetOptionHandler}
+ */
+public final class DrillSqlResetOption extends SqlSetOption {
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("RESET_OPTION", SqlKind.SET_OPTION) {
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+      SqlNode scopeNode = operands[0];
+      return new DrillSqlResetOption(pos, scopeNode == null ? null : scopeNode.toString(), (SqlIdentifier) operands[1]);
+    }
+  };
+
+  public DrillSqlResetOption(SqlParserPos pos, String scope, SqlIdentifier name) {
+    super(pos, scope, name, null);
+  }
+
+  @Override
+  public SqlKind getKind() {
+    return SqlKind.SET_OPTION;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    List<SqlNode> operandList = new ArrayList<>();
+
+    SqlIdentifier scopeIdentifier = (this.getScope() == null) ? null : new SqlIdentifier(this.getScope(),
+      SqlParserPos.ZERO);
+
+    operandList.add(scopeIdentifier);
+    operandList.add(this.getName());
+    return ImmutableNullableList.copyOf(operandList);
+  }
+
+  @Override
+  public void setOperand(int i, SqlNode operand) {
+    switch (i) {
+      case 0:
+        if (operand != null) {
+          this.setScope(((SqlIdentifier) operand).getSimple());
+        } else {
+          this.setScope(null);
+        }
+        break;
+      case 1:
+        this.setName((SqlIdentifier) operand);
+        break;
+      default:
+        throw new AssertionError(i);
+    }
+  }
+
+  @Override
+  protected void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("RESET");
+
+    SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.SIMPLE);
+
+    this.getName().unparse(writer, leftPrec, rightPrec);
+    writer.endList(frame);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlSetOption.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlSetOption.java
new file mode 100644
index 0000000..8cf5f77
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlSetOption.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSetOption;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
+
+/**
+ * Sql parse tree node to represent statement: {@code SET <NAME> [ = VALUE ]}.
+ * Statement handled in: {@link SetOptionHandler}
+ */
+public final class DrillSqlSetOption extends SqlSetOption {
+
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SET_OPTION", SqlKind.SET_OPTION) {
+    @Override
+    public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+      SqlNode scopeNode = operands[0];
+      String scope = scopeNode == null ? null : scopeNode.toString();
+      return new DrillSqlSetOption(pos, scope, (SqlIdentifier) operands[1], operands[2]);
+    }
+  };
+
+    public DrillSqlSetOption(SqlParserPos pos, String scope, SqlIdentifier name, SqlNode value) {
+    super(pos, scope, name, value);
+  }
+
+  @Override
+  public SqlKind getKind() {
+    return SqlKind.SET_OPTION;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  protected void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
+    writer.keyword("SET");
+
+    SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.SIMPLE);
+    this.getName().unparse(writer, leftPrec, rightPrec);
+
+    if (this.getValue() != null) {
+      writer.sep("=");
+      this.getValue().unparse(writer, leftPrec, rightPrec);
+    }
+
+    writer.endList(frame);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandlerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandlerTest.java
new file mode 100644
index 0000000..cc3d4a2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/ResetOptionHandlerTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SqlTest.class)
+public class ResetOptionHandlerTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+  }
+
+  @Test
+  public void testReset() throws Exception {
+    int defaultValue = Integer.valueOf(client.queryBuilder()
+        .sql("SELECT val from sys.options where name = '%s' limit 1", ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE)
+        .singletonString());
+
+    int testValue = defaultValue + 55;
+
+    try {
+      client.alterSession(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE, testValue);
+      client.testBuilder()
+          .sqlQuery("select name, val from sys.options where name = '%s'", ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE)
+          .unOrdered()
+          .baselineColumns("name", "val")
+          .baselineValues(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE, String.valueOf(testValue))
+          .go();
+
+      client.resetSession(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE);
+      client.testBuilder()
+          .sqlQuery("select name, val from sys.options where name = '%s'", ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE)
+          .unOrdered()
+          .baselineColumns("name", "val")
+          .baselineValues(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE, String.valueOf(defaultValue))
+          .go();
+    } finally {
+      client.resetSession(ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE);
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
new file mode 100644
index 0000000..def1240
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandlerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.compile.ClassCompilerSelector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SqlTest.class)
+public class SetOptionHandlerTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+  }
+
+  @Test
+  public void testSimpleSetQuery() throws Exception {
+    String defaultValue = client.queryBuilder()
+        .sql("SELECT val from sys.options where name = '%s' limit 1",
+            ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION)
+        .singletonString();
+
+    boolean newValue = !Boolean.valueOf(defaultValue);
+    try {
+      client.alterSession(ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION, newValue);
+
+      String changedValue = client.queryBuilder()
+          .sql("SELECT val from sys.options where name = '%s' limit 1",
+              ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION)
+          .singletonString();
+
+      Assert.assertEquals(String.valueOf(newValue), changedValue);
+    } finally {
+      client.resetSession(ClassCompilerSelector.JAVA_COMPILER_DEBUG_OPTION);
+    }
+  }
+
+  @Test
+  public void testViewSetQuery() throws Exception {
+    client.testBuilder()  // BIT
+        .sqlQuery("SET `%s`", ExecConstants.ENABLE_ITERATOR_VALIDATION_OPTION)
+        .unOrdered()
+        .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
+            ExecConstants.ENABLE_ITERATOR_VALIDATION_OPTION)
+        .go();
+
+    client.testBuilder()  // BIGINT
+        .sqlQuery("SET `%s`", ExecConstants.OUTPUT_BATCH_SIZE)
+        .unOrdered()
+        .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
+            ExecConstants.OUTPUT_BATCH_SIZE)
+        .go();
+
+    client.testBuilder()  // FLOAT
+        .sqlQuery("SET `%s`", ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR)
+        .unOrdered()
+        .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
+            ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR)
+        .go();
+
+    client.testBuilder()  // VARCHAR
+        .sqlQuery("SET `%s`", ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL)
+        .unOrdered()
+        .sqlBaselineQuery("SELECT name, val as value FROM sys.options where name = '%s' limit 1",
+            ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL)
+        .go();
+  }
+}


[drill] 03/08: DRILL-7171: Create metadata directories cache file in the leaf level directories to support ConvertCountToDirectScan optimization. closes #1748

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit de74eabe013d9249712a14e1f0acabdc8d3ebc16
Author: Venkata Jyothsna Donapati <jy...@gmail.com>
AuthorDate: Thu Apr 11 14:16:36 2019 -0700

    DRILL-7171: Create metadata directories cache file in the leaf level directories to support ConvertCountToDirectScan optimization.
    closes #1748
---
 .../exec/store/parquet/metadata/Metadata.java      | 18 ++--
 .../logical/TestConvertCountToDirectScan.java      | 98 ++++++++++++++++++----
 2 files changed, 87 insertions(+), 29 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 59849e7..5459a8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -352,23 +352,15 @@ public class Metadata {
     writeFile(metadataTableWithRelativePaths.fileMetadata, new Path(path, METADATA_FILENAME), fs);
     writeFile(metadataTableWithRelativePaths.getSummary(), new Path(path, METADATA_SUMMARY_FILENAME), fs);
     Metadata_V4.MetadataSummary metadataSummaryWithRelativePaths = metadataTableWithRelativePaths.getSummary();
-
-    if (directoryList.size() > 0 && childFiles.size() == 0) {
-      ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths =
-          new ParquetTableMetadataDirs(metadataSummaryWithRelativePaths.directories);
-      writeFile(parquetTableMetadataDirsRelativePaths, new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
-      if (timer != null) {
-        logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
-      }
-      ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
-      return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
-    }
-    List<Path> emptyDirList = new ArrayList<>();
+    // Directories list will be empty at the leaf level directories. For sub-directories with both files and directories,
+    // only the directories will be included in the list.
+    writeFile(new ParquetTableMetadataDirs(metadataSummaryWithRelativePaths.directories),
+        new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
     if (timer != null) {
       logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
     }
-    return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(emptyDirList));
+    return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(directoryList));
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
index 4bd3a0f..eaf9257 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/logical/TestConvertCountToDirectScan.java
@@ -190,11 +190,11 @@ public class TestConvertCountToDirectScan extends PlanTestBase {
       testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern});
 
       testBuilder()
-              .sqlQuery(sql)
-              .unOrdered()
-              .baselineColumns("star_count", "int_column_count", "vrchr_column_count")
-              .baselineValues(24L, 8L, 12L)
-              .go();
+          .sqlQuery(sql)
+          .unOrdered()
+          .baselineColumns("star_count", "int_column_count", "vrchr_column_count")
+          .baselineValues(24L, 8L, 12L)
+          .go();
 
     } finally {
       test("drop table if exists %s", tableName);
@@ -222,17 +222,17 @@ public class TestConvertCountToDirectScan extends PlanTestBase {
 
       int expectedNumFiles = 1;
       String numFilesPattern = "numFiles = " + expectedNumFiles;
-      String usedMetaSummaryPattern = "usedMetadataSummaryFile = false";
+      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
       String recordReaderPattern = "DynamicPojoRecordReader";
 
       testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern});
 
       testBuilder()
-              .sqlQuery(sql)
-              .unOrdered()
-              .baselineColumns("star_count", "int_column_count", "vrchr_column_count")
-              .baselineValues(6L, 2L, 3L)
-              .go();
+          .sqlQuery(sql)
+          .unOrdered()
+          .baselineColumns("star_count", "int_column_count", "vrchr_column_count")
+          .baselineValues(6L, 2L, 3L)
+          .go();
 
     } finally {
       test("drop table if exists %s", tableName);
@@ -264,11 +264,77 @@ public class TestConvertCountToDirectScan extends PlanTestBase {
       testPlanMatchingPatterns(sql, new String[]{usedMetaSummaryPattern, recordReaderPattern});
 
       testBuilder()
-              .sqlQuery(sql)
-              .unOrdered()
-              .baselineColumns("star_count")
-              .baselineValues(250L)
-              .go();
+          .sqlQuery(sql)
+          .unOrdered()
+          .baselineColumns("star_count")
+          .baselineValues(250L)
+          .go();
+
+    } finally {
+      test("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testCountsForLeafDirectories() throws Exception {
+    test("use dfs.tmp");
+    String tableName = "parquet_table_counts";
+
+    try {
+      test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("create table `%s/3` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("refresh table metadata %s", tableName);
+
+      String sql = String.format("select\n" +
+              "count(*) as star_count\n" +
+              "from `%s/1`", tableName);
+
+      int expectedNumFiles = 1;
+      String numFilesPattern = "numFiles = " + expectedNumFiles;
+      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
+      String recordReaderPattern = "DynamicPojoRecordReader";
+
+      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern});
+
+      testBuilder()
+          .sqlQuery(sql)
+          .unOrdered()
+          .baselineColumns("star_count")
+          .baselineValues(25L)
+          .go();
+
+    } finally {
+      test("drop table if exists %s", tableName);
+    }
+  }
+
+  @Test
+  public void testCountsForDirWithFilesAndDir() throws Exception {
+    test("use dfs.tmp");
+    String tableName = "parquet_table_counts";
+
+    try {
+      test("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", tableName);
+      test("refresh table metadata %s", tableName);
+
+      String sql = String.format("select count(*) as star_count from `%s/1`", tableName);
+
+      int expectedNumFiles = 1;
+      String numFilesPattern = "numFiles = " + expectedNumFiles;
+      String usedMetaSummaryPattern = "usedMetadataSummaryFile = true";
+      String recordReaderPattern = "DynamicPojoRecordReader";
+
+      testPlanMatchingPatterns(sql, new String[]{numFilesPattern, usedMetaSummaryPattern, recordReaderPattern});
+
+      testBuilder()
+          .sqlQuery(sql)
+          .unOrdered()
+          .baselineColumns("star_count")
+          .baselineValues(75L)
+          .go();
 
     } finally {
       test("drop table if exists %s", tableName);


[drill] 07/08: DRILL-7228: Upgrade to a newer version of t-digest to address inaccuracies in histogram buckets. closes #1774

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e191c54cab9ef969e290dc2bf95e100cb4f5b1eb
Author: Aman Sinha <as...@maprtech.com>
AuthorDate: Tue Apr 30 12:03:53 2019 -0700

    DRILL-7228: Upgrade to a newer version of t-digest to address inaccuracies in histogram buckets.
    closes #1774
---
 exec/java-exec/pom.xml                             |   5 +
 .../drill/exec/expr/fn/impl/TDigestFunctions.java  | 140 ++++++++++-----------
 .../impl/statistics/TDigestMergedStatistic.java    |  12 +-
 .../planner/common/NumericEquiDepthHistogram.java  |   4 +-
 4 files changed, 83 insertions(+), 78 deletions(-)

diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2fcd935..c35e935 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -344,6 +344,11 @@
       <version>2.7.0</version>
     </dependency>
     <dependency>
+      <groupId>com.tdunning</groupId>
+      <artifactId>t-digest</artifactId>
+      <version>3.2</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <exclusions>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
index 041543b..bfcf78d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
@@ -67,13 +67,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -81,7 +81,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -105,7 +105,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -122,13 +122,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -140,7 +140,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -164,7 +164,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -181,13 +181,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -195,7 +195,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -219,7 +219,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -236,13 +236,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -254,7 +254,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -278,7 +278,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -295,13 +295,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -309,7 +309,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -333,7 +333,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -350,13 +350,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -368,7 +368,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -392,7 +392,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -409,13 +409,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -423,7 +423,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -447,7 +447,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -464,13 +464,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -482,7 +482,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -506,7 +506,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -523,13 +523,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -537,7 +537,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -561,7 +561,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -578,13 +578,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -596,7 +596,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -620,7 +620,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -637,13 +637,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -651,7 +651,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -675,7 +675,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -692,13 +692,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -710,7 +710,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -734,7 +734,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -751,13 +751,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -765,7 +765,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -789,7 +789,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -806,13 +806,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -824,7 +824,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -848,7 +848,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -865,13 +865,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         tdigest.add(in.value);
       }
     }
@@ -879,7 +879,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -903,7 +903,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -920,13 +920,13 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         if (in.isSet == 1) {
           tdigest.add(in.value);
         } else {
@@ -938,7 +938,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (tdigest.size() > 0) {
             int size = tdigest.smallByteSize();
@@ -962,7 +962,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 
@@ -1097,18 +1097,18 @@ public class TDigestFunctions {
     public void setup() {
       work = new ObjectHolder();
       compression.value = (int) options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
 
     @Override
     public void add() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           if (in.isSet != 0) {
             byte[] buf = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start, in.end, in.buffer).getBytes();
-            com.clearspring.analytics.stream.quantile.TDigest other =
-              com.clearspring.analytics.stream.quantile.TDigest.fromBytes(java.nio.ByteBuffer.wrap(buf));
+            com.tdunning.math.stats.MergingDigest other =
+              com.tdunning.math.stats.MergingDigest.fromBytes(java.nio.ByteBuffer.wrap(buf));
             tdigest.add(other);
           }
         } catch (Exception e) {
@@ -1120,7 +1120,7 @@ public class TDigestFunctions {
     @Override
     public void output() {
       if (work.obj != null) {
-        com.clearspring.analytics.stream.quantile.TDigest tdigest = (com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+        com.tdunning.math.stats.MergingDigest tdigest = (com.tdunning.math.stats.MergingDigest) work.obj;
         try {
           int size = tdigest.smallByteSize();
           java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
@@ -1140,7 +1140,7 @@ public class TDigestFunctions {
 
     @Override
     public void reset() {
-      work.obj = new com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+      work.obj = new com.tdunning.math.stats.MergingDigest(compression.value);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
index cccbff6..dc84eba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.statistics;
 // Library implementing TDigest algorithm to derive approximate quantiles. Please refer to:
 // 'Computing Extremely Accurate Quantiles using t-Digests' by Ted Dunning and Otmar Ertl
 
-import com.clearspring.analytics.stream.quantile.TDigest;
+import com.tdunning.math.stats.MergingDigest;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -33,7 +33,7 @@ import java.util.Map;
 import java.nio.ByteBuffer;
 
 public class TDigestMergedStatistic extends AbstractMergedStatistic {
-  private Map<String, TDigest> tdigestHolder;
+  private Map<String, MergingDigest> tdigestHolder;
   private int compression;
 
   public TDigestMergedStatistic() {
@@ -63,7 +63,7 @@ public class TDigestMergedStatistic extends AbstractMergedStatistic {
     assert (input.getField().getType().getMinorType() == TypeProtos.MinorType.MAP);
     for (ValueVector vv : input) {
       String colName = vv.getField().getName();
-      TDigest colTdigestHolder = null;
+      MergingDigest colTdigestHolder = null;
       if (tdigestHolder.get(colName) != null) {
         colTdigestHolder = tdigestHolder.get(colName);
       }
@@ -71,7 +71,7 @@ public class TDigestMergedStatistic extends AbstractMergedStatistic {
       NullableVarBinaryVector.Accessor accessor = tdigestVector.getAccessor();
 
       if (!accessor.isNull(0)) {
-        TDigest other = TDigest.fromBytes(ByteBuffer.wrap(accessor.get(0)));
+        MergingDigest other = MergingDigest.fromBytes(ByteBuffer.wrap(accessor.get(0)));
         if (colTdigestHolder != null) {
           colTdigestHolder.add(other);
           tdigestHolder.put(colName, colTdigestHolder);
@@ -82,7 +82,7 @@ public class TDigestMergedStatistic extends AbstractMergedStatistic {
     }
   }
 
-  public TDigest getStat(String colName) {
+  public MergingDigest getStat(String colName) {
     if (state != State.COMPLETE) {
       throw new IllegalStateException(String.format("Statistic `%s` has not completed merging statistics",
           name));
@@ -98,7 +98,7 @@ public class TDigestMergedStatistic extends AbstractMergedStatistic {
     assert (state == State.MERGE);
     for (ValueVector outMapCol : output) {
       String colName = outMapCol.getField().getName();
-      TDigest colTdigestHolder = tdigestHolder.get(colName);
+      MergingDigest colTdigestHolder = tdigestHolder.get(colName);
       NullableVarBinaryVector vv = (NullableVarBinaryVector) outMapCol;
       vv.allocateNewSafe();
       try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
index 9d5bf6f..0cda7c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexLiteral;
-import com.clearspring.analytics.stream.quantile.TDigest;
+import com.tdunning.math.stats.MergingDigest;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -255,7 +255,7 @@ public class NumericEquiDepthHistogram implements Histogram {
   public static NumericEquiDepthHistogram buildFromTDigest(final byte[] tdigest_array,
                                                            final int numBuckets,
                                                            final long nonNullCount) {
-    TDigest tdigest = TDigest.fromBytes(java.nio.ByteBuffer.wrap(tdigest_array));
+    MergingDigest tdigest = MergingDigest.fromBytes(java.nio.ByteBuffer.wrap(tdigest_array));
 
     NumericEquiDepthHistogram histogram = new NumericEquiDepthHistogram(numBuckets);
 


[drill] 01/08: DRILL-7062: Initial implementation of run-time rowgroup pruning closes #1738

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9dc81e199cbabedbdd68acfa676a66a022b51907
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Tue Mar 26 20:11:55 2019 -0700

    DRILL-7062: Initial implementation of run-time rowgroup pruning
    closes #1738
---
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  18 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |   3 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../base/AbstractGroupScanWithMetadata.java        |  43 +++-
 .../apache/drill/exec/physical/impl/ScanBatch.java |   4 +-
 .../exec/planner/physical/PlannerSettings.java     |   6 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../exec/store/CommonParquetRecordReader.java      |  85 +++++++
 .../store/parquet/AbstractParquetGroupScan.java    |  11 +
 .../store/parquet/AbstractParquetRowGroupScan.java |  18 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 262 +++++++++++++++++----
 .../drill/exec/store/parquet/ParquetGroupScan.java |   3 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  38 ++-
 .../exec/store/parquet/ParquetReaderStats.java     |  13 +-
 .../exec/store/parquet/ParquetRowGroupScan.java    |  23 +-
 .../store/parquet/ParquetTableMetadataUtils.java   |   9 +-
 .../parquet/columnreaders/ParquetRecordReader.java |  64 +----
 .../store/parquet/columnreaders/ReadState.java     |   7 +
 .../batchsizing/RecordBatchSizerManager.java       |   3 +-
 .../exec/store/parquet/metadata/Metadata.java      |  70 ++++--
 .../exec/store/parquet/metadata/Metadata_V4.java   |   2 +
 .../exec/store/parquet2/DrillParquetReader.java    |  25 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../store/parquet/TestParquetFilterPushDown.java   |  66 +++++-
 .../TestPushDownAndPruningWithItemStar.java        |  16 +-
 25 files changed, 596 insertions(+), 199 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index 78e107a..09533bb 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -48,6 +49,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
   private final Map<String, String> confProperties;
+  private final TupleSchema tupleSchema;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -58,7 +60,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                             @JsonProperty("confProperties") Map<String, String> confProperties,
                                             @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
-                                            @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
+                                            @JsonProperty("filter") LogicalExpression filter,
+                                            @JsonProperty("tupleScema") TupleSchema tupleSchema) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
@@ -66,7 +69,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
         hivePartitionHolder,
         confProperties,
         readerConfig,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public HiveDrillNativeParquetRowGroupScan(String userName,
@@ -76,12 +80,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             HivePartitionHolder hivePartitionHolder,
                                             Map<String, String> confProperties,
                                             ParquetReaderConfig readerConfig,
-                                            LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                                            LogicalExpression filter,
+                                            TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter,null, tupleSchema);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
     this.confProperties = confProperties;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -108,7 +114,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
@@ -119,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, tupleSchema);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 0b9a463..94014fe 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.metastore.LocationProvider;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -143,7 +144,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       subPartitionHolder.add(readEntry.getPath(), values);
     }
     return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder,
-      confProperties, readerConfig, filter);
+      confProperties, readerConfig, filter, (TupleSchema) getTableMetadata().getSchema());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 24372ef..3503507 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -727,6 +727,10 @@ public final class ExecConstants {
 
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
 
+  public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning";
+  public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
+    new OptionDescription("Enables skipping the runtime pruning of the rowgroups"));
+
   public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
 
   public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 59d099f..15e1387 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.ops.UdfUtilities;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +69,9 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+// import static org.apache.drill.exec.ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS;
+import static org.apache.drill.exec.ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY;
+
 /**
  * Represents table group scan with metadata usage.
  */
@@ -191,12 +195,25 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
     this.filter = filter;
   }
 
+  /**
+   *  Set the filter - thus enabling runtime rowgroup pruning
+   *  The runtime pruning can be disabled with an option.
+   * @param filterExpr The filter to be used at runtime to match with rowgroups' footers
+   * @param optimizerContext The context for the options
+   */
+  public void setFilterForRuntime(LogicalExpression filterExpr, OptimizerRulesContext optimizerContext) {
+    OptionManager options = optimizerContext.getPlannerSettings().getOptions();
+    boolean skipRuntimePruning = options.getBoolean(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY); // if option is set to disable runtime pruning
+    if ( ! skipRuntimePruning ) { setFilter(filterExpr); }
+  }
+
   @Override
   public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
 
     // Builds filter for pruning. If filter cannot be built, null should be returned.
-    FilterPredicate filterPredicate = getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
+    FilterPredicate filterPredicate =
+            getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
     if (filterPredicate == null) {
       logger.debug("FilterPredicate cannot be built.");
       return null;
@@ -271,6 +288,14 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
    */
   protected abstract GroupScanWithMetadataFilterer getFilterer();
 
+  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+                                                   UdfUtilities udfUtilities,
+                                                   FunctionImplementationRegistry functionImplementationRegistry,
+                                                   OptionManager optionManager,
+                                                   boolean omitUnsupportedExprs) {
+    return getFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager,
+            omitUnsupportedExprs, supportsFileImplicitColumns(), (TupleSchema) getTableMetadata().getSchema());
+  }
   /**
    * Returns parquet filter predicate built from specified {@code filterExpr}.
    *
@@ -281,20 +306,19 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
    *                                       may be omitted from the resulting expression
    * @return parquet filter predicate
    */
-  public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
+  public static FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
                                             UdfUtilities udfUtilities,
                                             FunctionImplementationRegistry functionImplementationRegistry,
                                             OptionManager optionManager,
-                                            boolean omitUnsupportedExprs) {
-    TupleMetadata types = getSchema();
-    if (types == null) {
-      throw new UnsupportedOperationException("At least one schema source should be available.");
-    }
+                                            boolean omitUnsupportedExprs,
+                                            boolean supportsFileImplicitColumns,
+                                            TupleSchema tupleSchema) {
+    TupleMetadata types = tupleSchema.copy();
 
     Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new FilterEvaluatorUtils.FieldReferenceFinder(), null);
 
     // adds implicit or partition columns if they weren't added before.
-    if (supportsFileImplicitColumns()) {
+    if (supportsFileImplicitColumns) {
       for (SchemaPath schemaPath : schemaPathsInExpr) {
         if (isImplicitOrPartCol(schemaPath, optionManager) && SchemaPathUtils.getColumnMetadata(schemaPath, types) == null) {
           types.add(MaterializedField.create(schemaPath.getRootSegmentPath(), Types.required(TypeProtos.MinorType.VARCHAR)));
@@ -467,7 +491,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
   protected abstract boolean supportsFileImplicitColumns();
   protected abstract List<String> getPartitionValues(LocationProvider locationProvider);
 
-  protected boolean isImplicitOrPartCol(SchemaPath schemaPath, OptionManager optionManager) {
+  public static boolean isImplicitOrPartCol(SchemaPath schemaPath, OptionManager optionManager) {
     Set<String> implicitColNames = ColumnExplorer.initImplicitFileColumns(optionManager).keySet();
     return ColumnExplorer.isPartitionColumn(optionManager, schemaPath) || implicitColNames.contains(schemaPath.getRootSegmentPath());
   }
@@ -628,7 +652,6 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
             matchAllMetadata = true;
             partitions = filterAndGetMetadata(schemaPathsInExpr, source.getPartitionsMetadata(), filterPredicate, optionManager);
           } else {
-            matchAllMetadata = false;
             overflowLevel = MetadataLevel.PARTITION;
           }
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 07e2ae5..cdb36f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -72,7 +72,7 @@ public class ScanBatch implements CloseableRecordBatch {
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
-  private Iterator<RecordReader> readers;
+  private Iterator<? extends RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
   private final Mutator mutator;
@@ -100,7 +100,7 @@ public class ScanBatch implements CloseableRecordBatch {
    *                        columns, or there is a one-to-one mapping between reader and implicitColumns.
    */
   public ScanBatch(FragmentContext context,
-                   OperatorContext oContext, List<RecordReader> readerList,
+                   OperatorContext oContext, List<? extends RecordReader> readerList,
                    List<Map<String, String>> implicitColumnList) {
     this.context = context;
     this.readers = readerList.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 0a18126..a5506d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -182,8 +182,10 @@ public class PlannerSettings implements Context{
   public static final BooleanValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING = new BooleanValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
       new OptionDescription("Enables filter pushdown optimization for Parquet files. Drill reads the file metadata, stored in the footer, to eliminate row groups based on the filter condition. Default is true. (Drill 1.9+)"));
   public static final String PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY = "planner.store.parquet.rowgroup.filter.pushdown.threshold";
-  public static final PositiveLongValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new PositiveLongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY, Long.MAX_VALUE,
-      new OptionDescription("Sets the number of row groups that a table can have. You can increase the threshold if the filter can prune many row groups. However, if this setting is too high, the filter evaluation overhead increases. Base this setting on the data set. Reduce this setting if the planning time is significant or you do not see any benefit at runtime. (Drill 1.9+)"));
+  public static final LongValidator PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD = new LongValidator(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,
+      new OptionDescription("Maximal number of row groups a table can have to enable pruning by the planner. Base this setting on the data set - increasing if needed " +
+        "would add planning overhead, but may reduce execution overhead if the filter is relevant (e.g., on a sorted column, or many nulls). " +
+        "Reduce this setting if the planning time is significant or you do not see any benefit at runtime. A non-positive value disables plan time pruning."));
 
   public static final String QUOTING_IDENTIFIERS_KEY = "planner.parser.quoting_identifiers";
   public static final EnumeratedStringValidator QUOTING_IDENTIFIERS = new EnumeratedStringValidator(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6719db7..61fefe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -244,6 +244,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.SCALAR_REPLACEMENT_VALIDATOR),
       new OptionDefinition(ExecConstants.ENABLE_NEW_TEXT_READER),
       new OptionDefinition(ExecConstants.ENABLE_V3_TEXT_READER),
+      new OptionDefinition(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING),
       new OptionDefinition(ExecConstants.MIN_READER_WIDTH),
       new OptionDefinition(ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST),
       new OptionDefinition(ExecConstants.BULK_LOAD_TABLE_LIST_BULK_SIZE),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
new file mode 100644
index 0000000..1f6f367
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/CommonParquetRecordReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+
+public abstract class CommonParquetRecordReader extends AbstractRecordReader {
+  protected final FragmentContext fragmentContext;
+
+  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+
+  protected OperatorContext operatorContext;
+
+  protected ParquetMetadata footer;
+
+  public CommonParquetRecordReader(ParquetMetadata footer, FragmentContext fragmentContext) {
+    this.footer = footer;
+    this.fragmentContext = fragmentContext;
+  }
+
+  public void updateRowgroupsStats(long numRowgroups, long rowgroupsPruned) {
+    parquetReaderStats.numRowgroups.set(numRowgroups);
+    parquetReaderStats.rowgroupsPruned.set(rowgroupsPruned);
+  }
+
+  public enum Metric implements MetricDef {
+    NUM_ROWGROUPS,               // Number of rowgroups assigned to this minor fragment
+    ROWGROUPS_PRUNED,            // Number of rowgroups pruned out at runtime
+    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
+    NUM_DATA_PAGE_lOADS,         // Number of data pages read
+    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
+    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
+    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
+    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary pages
+    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
+    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
+    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
+    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary pages from disk
+    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from disk
+    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
+    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary pages
+    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
+    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
+    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
+    TIME_DISK_SCAN,                // Time in nanos spent in reading data from disk.
+    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed width data to value vectors
+    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting varwidth data to value vectors
+    TIME_PROCESS;                  // Time in nanos spent in processing
+
+    @Override public int metricId() {
+      return ordinal();
+    }
+  }
+
+  protected void closeStats(Logger logger, Path hadoopPath) {
+    if (parquetReaderStats != null) {
+      if ( operatorContext != null ) {
+        parquetReaderStats.update(operatorContext.getStats());
+      }
+      parquetReaderStats.logStats(logger, hadoopPath);
+      parquetReaderStats = null;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 5986b08..e368bb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -267,6 +267,17 @@ public abstract class AbstractParquetGroupScan extends AbstractGroupScanWithMeta
         // no need to create new group scan with the same row group.
         return null;
       }
+
+      // Stop files pruning for the case:
+      //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
+      if (getRowGroupsMetadata().size() >= optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+        this.rowGroups = getRowGroupsMetadata();
+        matchAllMetadata = false;
+        logger.trace("Stopping plan time pruning. Metadata has {} rowgroups, but the threshold option is set to {} rowgroups", this.rowGroups.size(),
+          optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD));
+        return null;
+      }
+
       logger.debug("All row groups have been filtered out. Add back one to get schema from scanner");
 
       Map<Path, FileMetadata> filesMap = getNextOrEmpty(getFilesMetadata().values()).stream()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 52b2baa..f05175e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -27,7 +27,9 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,17 +42,23 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
   protected final List<SchemaPath> columns;
   protected final ParquetReaderConfig readerConfig;
   protected final LogicalExpression filter;
+  protected final Path selectionRoot;
+  protected final TupleSchema tupleSchema;
 
   protected AbstractParquetRowGroupScan(String userName,
                                      List<RowGroupReadEntry> rowGroupReadEntries,
                                      List<SchemaPath> columns,
                                      ParquetReaderConfig readerConfig,
-                                     LogicalExpression filter) {
+                                     LogicalExpression filter,
+                                     Path selectionRoot,
+                                     TupleSchema tupleSchema) {
     super(userName);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
     this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
+    this.selectionRoot = selectionRoot;
+    this.tupleSchema = tupleSchema;
   }
 
   @JsonProperty
@@ -95,6 +103,14 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
     return Collections.emptyIterator();
   }
 
+  @JsonProperty
+  public Path getSelectionRoot() {
+    return selectionRoot;
+  }
+
+  @JsonProperty
+  public TupleSchema getTupleSchema() { return tupleSchema; }
+
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index b1819e6..41f52d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -17,6 +17,19 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.exec.expr.FilterPredicate;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.expr.stat.RowsMatch;
+import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
+import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
+import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.shaded.guava.com.google.common.base.Functions;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -28,11 +41,11 @@ import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -42,10 +55,12 @@ import org.apache.parquet.hadoop.util.HadoopInputFile;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractParquetScanBatchCreator {
@@ -65,21 +80,65 @@ public abstract class AbstractParquetScanBatchCreator {
 
     // keep footers in a map to avoid re-reading them
     Map<Path, ParquetMetadata> footers = new HashMap<>();
-    List<RecordReader> readers = new LinkedList<>();
+    List<CommonParquetRecordReader> readers = new LinkedList<>();
     List<Map<String, String>> implicitColumns = new ArrayList<>();
     Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
-    for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
-      /*
-      Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    RowGroupReadEntry firstRowGroup = null; // to be scanned in case ALL row groups are pruned out
+    ParquetMetadata firstFooter = null;
+    long rowgroupsPruned = 0; // for stats
+    TupleSchema tupleSchema = rowGroupScan.getTupleSchema();
+
+    try {
+
+      LogicalExpression filterExpr = rowGroupScan.getFilter();
+      boolean doRuntimePruning = filterExpr != null && // was a filter given ?   And it is not just a "TRUE" predicate
+        ! ((filterExpr instanceof ValueExpressions.BooleanExpression) && ((ValueExpressions.BooleanExpression) filterExpr).getBoolean() );
+
+      // Runtime pruning: Avoid recomputing metadata objects for each row-group in case they use the same file
+      // by keeping the following objects computed earlier (relies on same file being in consecutive rowgroups)
+      Path prevRowGroupPath = null;
+      Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
+      Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
+      FilterPredicate filterPredicate = null;
+      Set<SchemaPath> schemaPathsInExpr = null;
+      Set<String> columnsInExpr = null;
+      // for debug/info logging
+      long totalPruneTime = 0;
+      long totalRowgroups = rowGroupScan.getRowGroupReadEntries().size();
+      Stopwatch pruneTimer = Stopwatch.createUnstarted();
+
+      // If pruning - Prepare the predicate and the columns before the FOR LOOP
+      if ( doRuntimePruning ) {
+        filterPredicate = AbstractGroupScanWithMetadata.getFilterPredicate(filterExpr, context,
+          (FunctionImplementationRegistry) context.getFunctionRegistry(), context.getOptions(), true,
+          true /* supports file implicit columns */,
+          tupleSchema);
+        // Extract only the relevant columns from the filter (sans implicit columns, if any)
+        schemaPathsInExpr = filterExpr.accept(new FilterEvaluatorUtils.FieldReferenceFinder(), null);
+        columnsInExpr = new HashSet<>();
+        String partitionColumnLabel = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
+        for (SchemaPath path : schemaPathsInExpr) {
+          if (rowGroupScan.supportsFileImplicitColumns() &&
+            path.toString().matches(partitionColumnLabel+"\\d+")) {
+            continue;  // skip implicit columns like dir0, dir1
+          }
+          columnsInExpr.add(path.getRootSegmentPath());
+        }
+        doRuntimePruning = ! columnsInExpr.isEmpty(); // just in case: if no columns - cancel pruning
+      }
+
+      for (RowGroupReadEntry rowGroup : rowGroupScan.getRowGroupReadEntries()) {
+        /*
+        Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
+        TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
+        we should add more information to the RowGroupInfo that will be populated upon the first read to
+        provide the reader with all of th file meta-data it needs
+        These fields will be added to the constructor below
+        */
+
         Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
         DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
-        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
         if (!footers.containsKey(rowGroup.getPath())) {
           if (timer != null) {
             timer.start();
@@ -94,50 +153,79 @@ public abstract class AbstractParquetScanBatchCreator {
         }
         ParquetMetadata footer = footers.get(rowGroup.getPath());
 
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
-          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
-        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
-
-        boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
-        boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
-        logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", useNewReader ? "enabled" : "disabled",
-            containsComplexColumn ? "found." : "not found.");
-        RecordReader reader;
-
-        if (useNewReader || containsComplexColumn) {
-          reader = new DrillParquetReader(context,
-              footer,
-              rowGroup,
-              columnExplorer.getTableColumns(),
-              fs,
-              containsCorruptDates);
-        } else {
-          reader = new ParquetRecordReader(context,
-              rowGroup.getPath(),
-              rowGroup.getRowGroupIndex(),
-              rowGroup.getNumRecordsToRead(),
-              fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-              footer,
-              rowGroupScan.getColumns(),
-              containsCorruptDates);
-        }
+        //
+        //   If a filter is given (and it is not just "TRUE") - then use it to perform run-time pruning
+        //
+        if (doRuntimePruning) { // skip when no filter or filter is TRUE
+
+          pruneTimer.start();
+
+          int rowGroupIndex = rowGroup.getRowGroupIndex();
+          long footerRowCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+          // When starting a new file, or at the first time - Initialize the path specific metadata
+          if (!rowGroup.getPath().equals(prevRowGroupPath)) {
+            // Create a table metadata (V4)
+            tableMetadataV4 = new Metadata_V4.ParquetTableMetadata_v4();
+
+            // The file status for this file
+            FileStatus fileStatus = fs.getFileStatus(rowGroup.getPath());
+
+            // The file metadata (only for the columns used in the filter)
+            fileMetadataV4 = Metadata.getParquetFileMetadata_v4(tableMetadataV4, footer, fileStatus, fs, false, true, columnsInExpr, readerConfig);
+
+            prevRowGroupPath = rowGroup.getPath(); // for next time
+          }
+
+          MetadataBase.RowGroupMetadata rowGroupMetadata = fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
+
+          Map<SchemaPath, ColumnStatistics> columnsStatistics = ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, rowGroupMetadata);
 
-        logger.debug("Query {} uses {}",
-            QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
-            reader.getClass().getSimpleName());
-        readers.add(reader);
+          //
+          // Perform the Run-Time Pruning - i.e. Skip this rowgroup if the match fails
+          //
+          RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount);
 
-        List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
-        Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, rowGroupScan.supportsFileImplicitColumns());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
+          // collect logging info
+          long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
+          pruneTimer.stop();
+          pruneTimer.reset();
+          totalPruneTime += timeToRead;
+          logger.trace("Run-time pruning: {} row-group {} (RG index: {} row count: {}), took {} usec", // trace each single rowgroup
+            match == RowsMatch.NONE ? "Excluded" : "Included", rowGroup.getPath(), rowGroupIndex, footerRowCount, timeToRead);
+
+          // If this rowgroup failed the match - skip it
+          if (match == RowsMatch.NONE) {
+            rowgroupsPruned++; // one more RG was pruned
+            if (firstRowGroup == null) {  // keep first RG, to be used in case all row groups are pruned
+              firstRowGroup = rowGroup;
+              firstFooter = footer;
+            }
+            continue; // This Row group does not comply with the filter - prune it out and check the next Row Group
+          }
         }
 
-      } catch (IOException e) {
-        throw new ExecutionSetupException(e);
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext, columnExplorer, readers, implicitColumns, mapWithMaxColumns, rowGroup, fs, footer, false);
+      }
+
+      // in case all row groups were pruned out - create a single reader for the first one (so that the schema could be returned)
+      if ( readers.size() == 0 && firstRowGroup != null ) {
+        DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(firstRowGroup), firstRowGroup.getPath());
+        mapWithMaxColumns = createReaderAndImplicitColumns(context, rowGroupScan, oContext, columnExplorer, readers, implicitColumns, mapWithMaxColumns, firstRowGroup, fs,
+          firstFooter, true);
+      }
+      if ( totalPruneTime > 0 ) {
+        logger.info("Finished parquet_runtime_pruning in {} usec. Out of given {} rowgroups, {} were pruned. {}", totalPruneTime, totalRowgroups, rowgroupsPruned,
+          totalRowgroups == rowgroupsPruned ? "ALL_PRUNED !!" : "");
       }
+
+      // Update stats (same in every reader - the others would just overwrite the stats)
+      for (CommonParquetRecordReader rr : readers ) {
+          rr.updateRowgroupsStats(totalRowgroups, rowgroupsPruned);
+      }
+
+    } catch (IOException|InterruptedException e) {
+      throw new ExecutionSetupException(e);
     }
 
     // all readers should have the same number of implicit columns, add missing ones with value null
@@ -149,6 +237,78 @@ public abstract class AbstractParquetScanBatchCreator {
     return new ScanBatch(context, oContext, readers, implicitColumns);
   }
 
+  /**
+   *  Create a reader and add it to the list of readers.
+   *
+   * @param context The fragment context
+   * @param rowGroupScan RowGroup Scan
+   * @param oContext Operator context
+   * @param columnExplorer The column helper class object
+   * @param readers the readers' list where a new reader is added to
+   * @param implicitColumns the implicit columns list
+   * @param mapWithMaxColumns To be modified, in case there are implicit columns
+   * @param rowGroup create a reader for this specific row group
+   * @param fs file system
+   * @param footer this file's footer
+   * // @param readSchemaOnly - if true sets the number of rows to read to be zero
+   * @return the (possibly modified) input  mapWithMaxColumns
+   */
+  private Map<String, String> createReaderAndImplicitColumns(ExecutorFragmentContext context,
+                                                             AbstractParquetRowGroupScan rowGroupScan,
+                                                             OperatorContext oContext,
+                                                             ColumnExplorer columnExplorer,
+                                                             List<CommonParquetRecordReader> readers,
+                                                             List<Map<String, String>> implicitColumns,
+                                                             Map<String, String> mapWithMaxColumns,
+                                                             RowGroupReadEntry rowGroup,
+                                                             DrillFileSystem fs,
+                                                             ParquetMetadata footer,
+                                                             boolean readSchemaOnly
+  ) {
+    ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
+      rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
+    boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
+    boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
+    logger.debug("PARQUET_NEW_RECORD_READER is {}. Complex columns {}.", useNewReader ? "enabled" : "disabled",
+        containsComplexColumn ? "found." : "not found.");
+    CommonParquetRecordReader reader;
+
+    if (useNewReader || containsComplexColumn) {
+      reader = new DrillParquetReader(context,
+          footer,
+          rowGroup,
+          columnExplorer.getTableColumns(),
+          fs,
+          containsCorruptDates); // TODO: if readSchemaOnly - then set to zero rows to read (currently fails)
+    } else {
+      reader = new ParquetRecordReader(context,
+          rowGroup.getPath(),
+          rowGroup.getRowGroupIndex(),
+          readSchemaOnly ? 0 : rowGroup.getNumRecordsToRead(), // if readSchemaOnly - then set to zero rows to read
+          fs,
+          CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+          footer,
+          rowGroupScan.getColumns(),
+          containsCorruptDates);
+    }
+
+    logger.debug("Query {} uses {}",
+        QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()),
+        reader.getClass().getSimpleName());
+    readers.add(reader);
+
+    List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+    Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), partitionValues, rowGroupScan.supportsFileImplicitColumns());
+    implicitColumns.add(implicitValues);
+    if (implicitValues.size() > mapWithMaxColumns.size()) {
+      mapWithMaxColumns = implicitValues;
+    }
+    return mapWithMaxColumns;
+  }
+
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index e4fd382..c35e21b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -206,7 +206,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter,
+      tableMetadata == null ? null : (TupleSchema) tableMetadata.getSchema());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 9e39b54..bd2e119 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.calcite.rel.core.Filter;
 import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
 import org.apache.drill.exec.expr.FilterPredicate;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
@@ -170,6 +171,8 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
 
+    // Default - pass the original filter expr to (potentialy) be used at run-time
+    groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later may remove or set to another filter (see below)
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     AbstractGroupScanWithMetadata newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
@@ -187,6 +190,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
         // If current row group fully matches filter,
         // but row group pruning did not happen, remove the filter.
         if (nonConvertedPredList.isEmpty()) {
+          groupScan.setFilterForRuntime(null, optimizerContext); // disable the original filter expr (i.e. don't use it at run-time)
           call.transformTo(child);
         } else if (nonConvertedPredList.size() == predList.size()) {
           // None of the predicates participated in filter pushdown.
@@ -194,11 +198,18 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
         } else {
           // If some of the predicates weren't used in the filter, creates new filter with them
           // on top of current scan. Excludes the case when all predicates weren't used in the filter.
-          call.transformTo(filter.copy(filter.getTraitSet(), child,
-              RexUtil.composeConjunction(
-                  filter.getCluster().getRexBuilder(),
-                  nonConvertedPredList,
-                  true)));
+          Filter theNewFilter  = filter.copy(filter.getTraitSet(), child,
+            RexUtil.composeConjunction(
+              filter.getCluster().getRexBuilder(),
+              nonConvertedPredList,
+              true));
+
+          LogicalExpression filterPredicate = DrillOptiq.toDrill(
+            new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theNewFilter.getCondition());
+
+          groupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
+
+          call.transformTo(theNewFilter); // Replace the child with the new filter on top of the child/scan
         }
       }
       return;
@@ -213,11 +224,18 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     if (newGroupScan.isMatchAllMetadata()) {
       // creates filter from the expressions which can't be pushed to the scan
       if (!nonConvertedPredList.isEmpty()) {
-        newNode = filter.copy(filter.getTraitSet(), newNode,
-            RexUtil.composeConjunction(
-                filter.getCluster().getRexBuilder(),
-                nonConvertedPredList,
-                true));
+        Filter theFilterRel  = filter.copy(filter.getTraitSet(), newNode,
+          RexUtil.composeConjunction(
+            filter.getCluster().getRexBuilder(),
+            nonConvertedPredList,
+            true));
+
+        LogicalExpression filterPredicate = DrillOptiq.toDrill(
+          new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, theFilterRel.getCondition());
+
+        newGroupScan.setFilterForRuntime(filterPredicate, optimizerContext); // pass the new filter expr to (potentialy) be used at run-time
+
+        newNode = theFilterRel; // replace the new node with the new filter on top of that new node
       }
       call.transformTo(newNode);
       return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index 6a7b967..8b1c43d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -20,11 +20,14 @@ package org.apache.drill.exec.store.parquet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader.Metric;
+import org.apache.drill.exec.store.CommonParquetRecordReader.Metric;
 import org.apache.hadoop.fs.Path;
 
 public class ParquetReaderStats {
 
+  public AtomicLong numRowgroups = new AtomicLong();
+  public AtomicLong rowgroupsPruned = new AtomicLong();
+
   public AtomicLong numDictPageLoads = new AtomicLong();
   public AtomicLong numDataPageLoads = new AtomicLong();
   public AtomicLong numDataPagesDecoded = new AtomicLong();
@@ -54,8 +57,10 @@ public class ParquetReaderStats {
 
   public void logStats(org.slf4j.Logger logger, Path hadoopPath) {
     logger.trace(
-        "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+        "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
         hadoopPath,
+        numRowgroups,
+        rowgroupsPruned,
         numDictPageLoads,
         numDataPageLoads,
         numDataPagesDecoded,
@@ -79,6 +84,10 @@ public class ParquetReaderStats {
   }
 
   public void update(OperatorStats stats){
+    stats.setLongStat(Metric.NUM_ROWGROUPS,
+        numRowgroups.longValue());
+    stats.setLongStat(Metric.ROWGROUPS_PRUNED,
+        rowgroupsPruned.longValue());
     stats.addLongStat(Metric.NUM_DICT_PAGE_LOADS,
         numDictPageLoads.longValue());
     stats.addLongStat(Metric.NUM_DATA_PAGE_lOADS, numDataPageLoads.longValue());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index 2513772..fcbf307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
@@ -46,7 +47,6 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
-  private final Path selectionRoot;
 
   @JsonCreator
   public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -57,14 +57,16 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              @JsonProperty("columns") List<SchemaPath> columns,
                              @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                              @JsonProperty("selectionRoot") Path selectionRoot,
-                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
+                             @JsonProperty("filter") LogicalExpression filter,
+                             @JsonProperty("tupleSchema") TupleSchema tupleSchema) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
         columns,
         readerConfig,
         selectionRoot,
-        filter);
+        filter,
+        tupleSchema);
   }
 
   public ParquetRowGroupScan(String userName,
@@ -73,11 +75,11 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              List<SchemaPath> columns,
                              ParquetReaderConfig readerConfig,
                              Path selectionRoot,
-                             LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
+                             LogicalExpression filter,
+                             TupleSchema tupleSchema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter, selectionRoot, tupleSchema);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
-    this.selectionRoot = selectionRoot;
   }
 
   @JsonProperty
@@ -90,11 +92,6 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
     return formatConfig;
   }
 
-  @JsonProperty
-  public Path getSelectionRoot() {
-    return selectionRoot;
-  }
-
   @JsonIgnore
   public ParquetFormatPlugin getStorageEngine() {
     return formatPlugin;
@@ -103,7 +100,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
@@ -113,7 +110,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter, tupleSchema);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index b1a686b..b8a912d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Utility class for converting parquet metadata classes to metastore metadata classes.
@@ -264,7 +265,7 @@ public class ParquetTableMetadataUtils {
    * @return map with converted row group metadata
    */
   @SuppressWarnings("unchecked")
-  private static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
+  public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
       MetadataBase.ParquetTableMetadataBase tableMetadata, MetadataBase.RowGroupMetadata rowGroupMetadata) {
 
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
@@ -301,8 +302,10 @@ public class ParquetTableMetadataUtils {
           Set<SchemaPath> schemaPaths, MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
     Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
     if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
-      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata :
-          ((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap().values()) {
+      ConcurrentHashMap<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4 > columnTypeInfoMap =
+        ((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap();
+      if ( columnTypeInfoMap == null ) { return columnsStatistics; } // in some cases for runtime pruning
+      for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : columnTypeInfoMap.values()) {
         SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
         if (!schemaPaths.contains(schemaPath)) {
           Map<StatisticsKind, Object> statistics = new HashMap<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index ba4c493..4f1c8a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import java.util.List;
@@ -28,11 +29,8 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
 import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
@@ -42,7 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
-public class ParquetRecordReader extends AbstractRecordReader {
+public class ParquetRecordReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   /** Set when caller wants to read all the rows contained within the Parquet file */
@@ -60,21 +58,19 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // used for clearing the first n bits of a byte
   public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
 
-  private OperatorContext operatorContext;
+  /** Parquet Schema */
+  ParquetSchema schema;
 
   private FileSystem fileSystem;
   private final long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
-  private ParquetMetadata footer;
 
   private final CodecFactory codecFactory;
   int rowGroupIndex;
-  private final FragmentContext fragmentContext;
+
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
-  /** Parquet Schema */
-  ParquetSchema schema;
   /** Container object for holding Parquet columnar readers state */
   ReadState readState;
   /** Responsible for managing record batch size constraints */
@@ -92,36 +88,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
   @SuppressWarnings("unused")
   private Path name;
 
-  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
   private BatchReader batchReader;
 
-  public enum Metric implements MetricDef {
-    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
-    NUM_DATA_PAGE_lOADS,         // Number of data pages read
-    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
-    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
-    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
-    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary pages
-    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
-    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
-    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
-    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary pages from disk
-    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from disk
-    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
-    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary pages
-    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
-    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
-    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
-    TIME_DISK_SCAN,                // Time in nanos spent in reading data from disk.
-    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed width data to value vectors
-    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting varwidth data to value vectors
-    TIME_PROCESS;                  // Time in nanos spent in processing
-
-    @Override public int metricId() {
-      return ordinal();
-    }
-  }
-
   public ParquetRecordReader(FragmentContext fragmentContext,
       Path path,
       int rowGroupIndex,
@@ -156,15 +124,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+    super(footer, fragmentContext);
 
     this.name = path;
     this.hadoopPath = path;
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
-    this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
-    this.fragmentContext = fragmentContext;
     this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
     this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
     this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
@@ -321,15 +288,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
     codecFactory.release();
 
-    if (parquetReaderStats != null) {
-      updateStats();
-      parquetReaderStats.logStats(logger, hadoopPath);
-      parquetReaderStats = null;
-    }
-  }
-
-  private void updateStats() {
-    parquetReaderStats.update(operatorContext.getStats());
+    closeStats(logger, hadoopPath);
   }
 
   @Override
@@ -338,13 +297,14 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   private int initNumRecordsToRead(long numRecordsToRead, int rowGroupIndex, ParquetMetadata footer) {
+    if ( numRecordsToRead == 0 ) { return 0; } // runtime pruning sometimes prunes everything, needs one empty RG for the schema
+    int numRowsInRowgroup = (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
     // Callers can pass -1 if they want to read all rows.
     if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
-      return (int) footer.getBlocks().get(rowGroupIndex).getRowCount();
-    } else {
-      assert (numRecordsToRead >= 0);
-      return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
+      return numRowsInRowgroup;
     }
+    assert (numRecordsToRead > 0);
+    return (int) Math.min(numRecordsToRead, numRowsInRowgroup);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
index c545862..deccde5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -80,6 +80,13 @@ public class ReadState {
       nullFilledVectors = new ArrayList<>();
     }
 
+    // In the case where runtime pruning prunes out all the rowgroups, then just a single rowgroup
+    // with zero rows is read (in order to get the schema, no need for the rows)
+    if ( numRecordsToRead == 0 ) {
+      this.totalNumRecordsToRead = 0;
+      return;
+    }
+
     // Because of JIRA DRILL-6528, the Parquet reader is sometimes getting the wrong
     // number of rows to read. For now, returning all a file data (till
     // downstream operator stop consuming).
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index a39356b..8b0e34a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -371,7 +371,8 @@ public final class RecordBatchSizerManager {
 
   private void assignColumnsBatchMemory() {
 
-    if (getNumColumns() == 0) {
+    if (getNumColumns() == 0 ||
+        maxRecordsPerBatch == 0) { // Happens when all row-groups are pruned, and only one is returned empty (TODO: currently not empty)
       return;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 2b0581c..59849e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -470,7 +470,7 @@ public class Metadata {
 
     @Override
     protected ParquetFileAndRowCountMetadata runInner() throws Exception {
-      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, allColumnsInteresting, columnSet);
+      return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, allColumnsInteresting, columnSet, readerConfig);
     }
 
     public String toString() {
@@ -478,7 +478,7 @@ public class Metadata {
     }
   }
 
-  private ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] path, int depth) {
+  private static ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] path, int depth) {
     if (type.isPrimitive()) {
       PrimitiveType primitiveType = (PrimitiveType) type;
       int precision = 0;
@@ -497,7 +497,7 @@ public class Metadata {
     return getColTypeInfo(schema, t, path, depth + 1);
   }
 
-  private class ColTypeInfo {
+  private static class ColTypeInfo {
     public OriginalType originalType;
     public int precision;
     public int scale;
@@ -513,28 +513,52 @@ public class Metadata {
     }
   }
 
+  // A private version of the following static method, with no footer given
+  private ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                           final FileStatus file, final FileSystem fs,
+                                                           boolean allColumnsInteresting, Set<String> columnSet,
+                                                           ParquetReaderConfig readerConfig)
+    throws IOException, InterruptedException {
+    return getParquetFileMetadata_v4(parquetTableMetadata, null /* no footer */, file, fs, allColumnsInteresting, false, columnSet, readerConfig);
+  }
   /**
-   * Get the metadata for a single file
+   * Get the file metadata for a single file
+   *
+   * @param parquetTableMetadata The table metadata to be updated with all the columns' info
+   * @param footer If non null, use this footer instead of reading it from the file
+   * @param file The file
+   * @param allColumnsInteresting If true, read the min/max metadata for all the columns
+   * @param skipNonInteresting If true, collect info only for the interesting columns
+   * @param columnSet Specifies specific columns for which min/max metadata is collected
+   * @param readerConfig for the options
+   * @return the file metadata
    */
-  private ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
-                                                                   final FileStatus file, final FileSystem fs, boolean allColumnsInteresting, Set<String> columnSet) throws IOException, InterruptedException {
-    final ParquetMetadata metadata;
-    final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
-    final Configuration conf = new Configuration(fs.getConf());
+  public static ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
+                                                                         ParquetMetadata footer,
+                                                                         final FileStatus file,
+                                                                         final FileSystem fs,
+                                                                         boolean allColumnsInteresting,
+                                                                         boolean skipNonInteresting,
+                                                                         Set<String> columnSet,
+                                                                         ParquetReaderConfig readerConfig)
+    throws IOException, InterruptedException {
     Map<ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = new HashMap<>();
     long totalRowCount = 0;
-    try {
-      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
-        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
-          return parquetFileReader.getFooter();
-        }
-      });
-    } catch(Exception e) {
-      logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
-        file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
-      throw e;
+    ParquetMetadata metadata = footer; // if a non-null footer is given, no need to read it again from the file
+    if (metadata == null) {
+      final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+      final Configuration conf = new Configuration(fs.getConf());
+      try {
+        metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) () -> {
+          try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
+            return parquetFileReader.getFooter();
+          }
+        });
+      } catch (Exception e) {
+        logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}", file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
+        throw e;
+      }
     }
-
     MessageType schema = metadata.getFileMetaData().getSchema();
 
     Map<SchemaPath, ColTypeInfo> colTypeInfoMap = new HashMap<>();
@@ -560,6 +584,8 @@ public class Metadata {
       for (ColumnChunkMetaData col : rowGroup.getColumns()) {
         String[] columnName = col.getPath().toArray();
         SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+        boolean thisColumnIsInteresting = allColumnsInteresting || columnSet == null || columnSet.contains(columnSchemaName.getRootSegmentPath());
+        if ( skipNonInteresting && ! thisColumnIsInteresting ) { continue; }
         ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
         Statistics<?> stats = col.getStatistics();
         long totalNullCount = stats.getNumNulls();
@@ -578,7 +604,7 @@ public class Metadata {
           long nullCount = totalNullCountMap.get(columnTypeMetadataKey) + totalNullCount;
           totalNullCountMap.put(columnTypeMetadataKey, nullCount);
         }
-        if (allColumnsInteresting || columnSet == null || !allColumnsInteresting && columnSet != null && columnSet.size() > 0 && columnSet.contains(columnSchemaName.getRootSegmentPath())) {
+        if ( thisColumnIsInteresting ) {
           // Save the column schema info. We'll merge it into one list
           Object minValue = null;
           Object maxValue = null;
@@ -629,7 +655,7 @@ public class Metadata {
    * @param length     the length of the row group
    * @return host affinity for the row group
    */
-  private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length)
+  private static Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length)
       throws IOException {
     BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
     Map<String, Float> hostAffinityMap = Maps.newHashMap();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
index 7023a1d..e345ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V4.java
@@ -44,6 +44,8 @@ public class Metadata_V4 {
     MetadataSummary metadataSummary = new MetadataSummary();
     FileMetadata fileMetadata = new FileMetadata();
 
+    public ParquetTableMetadata_v4() {}
+
     public ParquetTableMetadata_v4(MetadataSummary metadataSummary) {
       this.metadataSummary = metadataSummary;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 210b8fe..f338d2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -39,11 +39,11 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -66,23 +66,20 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
-public class DrillParquetReader extends AbstractRecordReader {
+public class DrillParquetReader extends CommonParquetRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
 
   // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader
 
   private static final char DEFAULT_RECORDS_TO_READ = 32*1024;
 
-  private ParquetMetadata footer;
   private MessageType schema;
-  private DrillFileSystem fileSystem;
+  private DrillFileSystem drillFileSystem;
   private RowGroupReadEntry entry;
   private ColumnChunkIncReadStore pageReadStore;
   private RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
-  private OperatorContext operatorContext;
-  private FragmentContext fragmentContext;
   /** Configured Parquet records per batch */
   private final int recordsPerBatch;
 
@@ -100,14 +97,12 @@ public class DrillParquetReader extends AbstractRecordReader {
   // See DRILL-4203
   private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates;
 
-  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry,
-      List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+  public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, DrillFileSystem fileSystem, ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+    super(footer, fragmentContext);
     this.containsCorruptedDates = containsCorruptedDates;
-    this.footer = footer;
-    this.fileSystem = fileSystem;
+    this.drillFileSystem = fileSystem;
     this.entry = entry;
     setColumns(columns);
-    this.fragmentContext = fragmentContext;
     this.recordsPerBatch = (int) fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
   }
 
@@ -242,9 +237,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-          CodecFactory.createDirectCodecFactory(fileSystem.getConf(),
-              new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(),
-          fileSystem, filePath);
+          CodecFactory.createDirectCodecFactory(drillFileSystem.getConf(),
+              new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0), operatorContext.getAllocator(), drillFileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);
@@ -322,8 +316,9 @@ public class DrillParquetReader extends AbstractRecordReader {
 
   @Override
   public void close() {
+    closeStats(logger, entry.getPath());
     footer = null;
-    fileSystem = null;
+    drillFileSystem = null;
     entry = null;
     recordReader = null;
     recordMaterializer = null;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index b2ff4a5..5096680 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -525,6 +525,7 @@ drill.exec.options: {
     exec.storage.enable_new_text_reader: true,
     exec.storage.enable_v3_text_reader: false,
     exec.storage.min_width: 1,
+    exec.storage.skip_runtime_rowgroup_pruning: false,
     exec.udf.enable_dynamic_support: true,
     exec.udf.use_dynamic: true,
     drill.exec.stats.logging.batch_size: false,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index a62dc33..112cfec 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -20,17 +20,26 @@ package org.apache.drill.exec.store.parquet;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.expr.stat.RowsMatch;
 import org.apache.drill.exec.ops.FragmentContextImpl;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
 import org.apache.drill.metastore.ColumnStatistics;
 import org.apache.drill.metastore.ColumnStatisticsKind;
 import org.apache.drill.exec.expr.IsPredicate;
 import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 import org.junit.AfterClass;
@@ -46,8 +55,10 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestParquetFilterPushDown extends PlanTestBase {
   private static final String CTAS_TABLE = "order_ctas";
@@ -356,7 +367,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
 
   @Test
   // Test against parquet files from Drill CTAS post 1.8.0 release.
-  public void testDatePredicateAgaistDrillCTASPost1_8() throws Exception {
+  public void testDatePredicateAgainstDrillCTASPost1_8() throws Exception {
     test("use dfs.tmp");
     test("create table `%s/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and " +
       "date '1992-01-03'", CTAS_TABLE);
@@ -699,4 +710,57 @@ public class TestParquetFilterPushDown extends PlanTestBase {
   private MetadataBase.ParquetTableMetadataBase getParquetMetaData(File file) throws IOException {
     return Metadata.getParquetTableMetadata(fs, file.toURI().getPath(), ParquetReaderConfig.getDefaultInstance());
   }
+
+  // =========  runtime pruning  ==========
+  @Rule
+  public final BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
+  /**
+   *
+   * @throws Exception
+   */
+  private void genericTestRuntimePruning(int maxParallel, String sql, long expectedRows, int numPartitions, int numPruned) throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(baseDirTestWatcher)
+      .sessionOption(ExecConstants.SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,false)
+      .sessionOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY,0)
+      .maxParallelization(maxParallel)
+      .saveProfiles();
+
+    try (ClusterFixture cluster = builder.build();
+         ClientFixture client = cluster.clientFixture()) {
+      runAndCheckResults(client, sql, expectedRows, numPartitions, numPruned);
+    }
+  }
+  /**
+   * Test runtime pruning
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRuntimePruning() throws Exception {
+    // 's' is the partitioning key (values are: 3,4,5,6 ) -- prune out 2 out of 4 rowgroups
+    genericTestRuntimePruning( 2, "select a from cp.`parquet/multirowgroupwithNulls.parquet` where s > 4", 20, 4,2 );
+    // prune out all rowgroups
+    genericTestRuntimePruning( 2, "select a from cp.`parquet/multirowgroupwithNulls.parquet` where s > 8", 0, 4,4 );
+  }
+
+  private void runAndCheckResults(ClientFixture client, String sql, long expectedRows, long numPartitions, long numPruned) throws Exception {
+    QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+
+    if (expectedRows > 0) {
+      assertEquals(expectedRows, summary.recordCount());
+    }
+
+    ProfileParser profile = client.parseProfile(summary.queryIdString());
+    List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE);
+
+    assertTrue(!ops.isEmpty());
+    // check for the first op only
+    ProfileParser.OperatorProfile parquestScan0 = ops.get(0);
+    long resultNumRowgroups = parquestScan0.getMetric(ParquetRecordReader.Metric.NUM_ROWGROUPS.ordinal());
+    assertEquals(numPartitions, resultNumRowgroups);
+    long resultNumPruned = parquestScan0.getMetric(ParquetRecordReader.Metric.ROWGROUPS_PRUNED.ordinal());
+    assertEquals(numPruned,resultNumPruned);
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
index 6ac08ee..f45912f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningWithItemStar.java
@@ -79,7 +79,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
   public void testPushProjectIntoScanWithExpressionInFilter() throws Exception {
     String query = String.format("select o_orderdate from (select * from `%s`.`%s`) where o_custkey + o_orderkey < 5", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=3, numRowGroups=3, usedMetadataFile=false, columns=\\[`o_orderdate`, `o_custkey`, `o_orderkey`\\]"};
+    String[] expectedPlan = {"numFiles=3, numRowGroups=3, usedMetadataFile=false,.* columns=\\[`o_orderdate`, " +
+      "`o_custkey`, `o_orderkey`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -115,7 +116,7 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String query = "select t.trans_id from (select * from cp.`store/parquet/complex/complex.parquet`) t " +
       "where t.user_info.cust_id > 28 and t.user_info.device = 'IOS5' and t.marketing_info.camp_id > 5 and t.marketing_info.keywords[2] is not null";
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, " +
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* " +
       "columns=\\[`trans_id`, `user_info`.`cust_id`, `user_info`.`device`, `marketing_info`.`camp_id`, `marketing_info`.`keywords`\\[2\\]\\]"};
     String[] excludedPlan = {};
 
@@ -218,7 +219,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
   public void testFilterPushDownSingleCondition() throws Exception {
     String query = String.format("select * from (select * from `%s`.`%s`) where o_orderdate = date '1992-01-01'", DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -235,7 +237,7 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String query = String.format("select * from (select * from `%s`.`%s`) where o_orderdate = date '1992-01-01' or o_orderdate = date '1992-01-09'",
         DFS_TMP_SCHEMA, TABLE_NAME);
 
-    String[] expectedPlan = {"numFiles=2, numRowGroups=2, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=2, numRowGroups=2, usedMetadataFile=false,.* columns=\\[`\\*\\*`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -253,7 +255,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, TABLE_NAME);
     String query = String.format("select * from (select * from (select * from (%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);
@@ -270,7 +273,8 @@ public class TestPushDownAndPruningWithItemStar extends PlanTestBase {
     String subQuery = String.format("select * from `%s`.`%s`", DFS_TMP_SCHEMA, TABLE_NAME);
     String query = String.format("select * from (select * from (select *, o_custkey from (%s))) where o_orderdate = date '1992-01-01'", subQuery);
 
-    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false, columns=\\[`\\*\\*`, `o_custkey`, `o_orderdate`\\]"};
+    String[] expectedPlan = {"numFiles=1, numRowGroups=1, usedMetadataFile=false,.* columns=\\[`\\*\\*`, " +
+      "`o_custkey`, `o_orderdate`\\]"};
     String[] excludedPlan = {};
 
     PlanTestBase.testPlanMatchingPatterns(query, expectedPlan, excludedPlan);