You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/15 23:51:51 UTC

[GitHub] asfgit closed pull request #1537: DRILL-6744: Support varchar and decimal push down

asfgit closed pull request #1537: DRILL-6744: Support varchar and decimal push down
URL: https://github.com/apache/drill/pull/1537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index fc8a0576cd3..b5cff58076f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -131,7 +131,7 @@ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
       List<SchemaPath> columns) throws IOException {
-    return getGroupScan(userName, selection, columns, null /* indexDesc */);
+    return getGroupScan(userName, selection, columns, (IndexDesc) null /* indexDesc */);
   }
 
   @JsonIgnore
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index ea711572dc8..7286d7a245a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -32,11 +32,13 @@
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -97,7 +99,7 @@ public void onMatch(RelOptRuleCall call) {
       }
 
       final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel);
-      final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits);
+      final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits, settings.getOptions());
       if (hiveScanRel.getRowType().getFieldCount() == 0) {
         call.transformTo(nativeScanRel);
       } else {
@@ -138,7 +140,8 @@ Hive does not always give correct costing (i.e. for external tables Hive does no
    */
   private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
                                            final DrillScanRel hiveScanRel,
-                                           final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws Exception {
+                                           final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits,
+                                           final OptionManager options) throws Exception {
 
     final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
@@ -178,7 +181,8 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
             nativeScanCols,
             hiveScan.getStoragePlugin(),
             logicalInputSplits,
-            hiveScan.getConfProperties());
+            hiveScan.getConfProperties(),
+            ParquetReaderConfig.builder().withOptions(options).build());
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index 2e8f95dc9ef..bea06e0733b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -56,6 +57,7 @@ public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry r
                                             @JsonProperty("columns") List<SchemaPath> columns,
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                             @JsonProperty("confProperties") Map<String, String> confProperties,
+                                            @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
@@ -63,6 +65,7 @@ public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry r
         columns,
         hivePartitionHolder,
         confProperties,
+        readerConfig,
         filter);
   }
 
@@ -72,8 +75,9 @@ public HiveDrillNativeParquetRowGroupScan(String userName,
                                             List<SchemaPath> columns,
                                             HivePartitionHolder hivePartitionHolder,
                                             Map<String, String> confProperties,
+                                            ParquetReaderConfig readerConfig,
                                             LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, filter);
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
@@ -103,7 +107,8 @@ public HiveStoragePlugin getHiveStoragePlugin() {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
@@ -113,12 +118,8 @@ public int getOperatorType() {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
-  }
-
-  @Override
-  public boolean areCorruptDatesAutoCorrected() {
-    return true;
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 1db8df2f886..617f6a59f4f 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,6 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -37,7 +38,6 @@
 import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.RowGroupInfo;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -73,8 +73,9 @@ public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineReg
                                     @JsonProperty("entries") List<ReadEntryWithPath> entries,
                                     @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                     @JsonProperty("confProperties") Map<String, String> confProperties,
+                                    @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                                     @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
-    super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
     this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
     this.hivePartitionHolder = hivePartitionHolder;
     this.confProperties = confProperties;
@@ -86,8 +87,9 @@ public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
-                                    Map<String, String> confProperties) throws IOException {
-    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
+                                    Map<String, String> confProperties,
+                                    ParquetReaderConfig readerConfig) throws IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, readerConfig, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public HiveDrillNativeParquetScan(String userName,
@@ -95,8 +97,9 @@ public HiveDrillNativeParquetScan(String userName,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
                                     Map<String, String> confProperties,
+                                    ParquetReaderConfig readerConfig,
                                     LogicalExpression filter) throws IOException {
-    super(userName, columns, new ArrayList<>(), filter);
+    super(userName, columns, new ArrayList<>(), readerConfig, filter);
 
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.hivePartitionHolder = new HivePartitionHolder();
@@ -154,7 +157,8 @@ public SubScan getSpecificScan(int minorFragmentId) {
       List<String> values = hivePartitionHolder.get(readEntry.getPath());
       subPartitionHolder.add(readEntry.getPath(), values);
     }
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
@@ -199,7 +203,6 @@ public String toString() {
 
   @Override
   protected void initInternal() throws IOException {
-    ParquetFormatConfig formatConfig = new ParquetFormatConfig();
     Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
     for (ReadEntryWithPath entry : entries) {
       Path path = new Path(entry.getPath());
@@ -209,7 +212,7 @@ protected void initInternal() throws IOException {
       FileSystem fs = path.getFileSystem(conf);
       fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
     }
-    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, formatConfig);
+    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, readerConfig);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index ea8d5df8412..1a02eb48f81 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -21,6 +21,8 @@
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.HiveStorageTest;
@@ -260,4 +262,43 @@ public void testHiveConfPropertiesAtSessionLevel() throws Exception {
     }
   }
 
+  @Test
+  public void testHiveVarcharPushDown() throws Exception {
+    String query = "select int_key from hive.kv_native where var_key = 'var_1'";
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=4"); // Hive creates parquet files using Parquet lib older than 1.10.0
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", property.getValue()});
+
+        testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("int_key")
+          .baselineValues(1)
+          .go();
+      }
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+    }
+  }
+
+  @Test
+  public void testHiveDecimalPushDown() throws Exception {
+    String query = "select int_key from hive.kv_native where dec_key = cast(1.11 as decimal(5, 2))";
+    // Hive generates parquet files using parquet lib older than 1.10.0
+    // thus statistics for decimal is not available
+    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numRowGroups=4"});
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("int_key")
+      .baselineValues(1)
+      .go();
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 65d1700fbaa..84fa368c8b9 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -552,12 +552,12 @@ private void generateTestData() throws Exception {
 
   private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) {
     // Hive managed table that has data qualified for Drill native filter push down
-    executeQuery(hiveDriver, "create table kv_native(key int, sub_key int) stored as parquet");
+    executeQuery(hiveDriver, "create table kv_native(key int, int_key int, var_key varchar(10), dec_key decimal(5, 2)) stored as parquet");
     // each insert is created in separate file
-    executeQuery(hiveDriver, "insert into table kv_native values (1, 1), (1, 2)");
-    executeQuery(hiveDriver, "insert into table kv_native values (1, 3), (1, 4)");
-    executeQuery(hiveDriver, "insert into table kv_native values (2, 5), (2, 6)");
-    executeQuery(hiveDriver, "insert into table kv_native values (null, 9), (null, 10)");
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 1, 'var_1', 1.11), (1, 2, 'var_2', 2.22)");
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 3, 'var_3', 3.33), (1, 4, 'var_4', 4.44)");
+    executeQuery(hiveDriver, "insert into table kv_native values (2, 5, 'var_5', 5.55), (2, 6, 'var_6', 6.66)");
+    executeQuery(hiveDriver, "insert into table kv_native values (null, 7, 'var_7', 7.77), (null, 8, 'var_8', 8.88)");
 
     // Hive external table which has three partitions
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index eed4ff830b6..d4f9aafbc4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -333,6 +333,14 @@ private ExecConstants() {
   public static final OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
       new OptionDescription("Enables Drill to implicitly interpret the INT96 timestamp data type in Parquet files."));
 
+  public static final String PARQUET_READER_STRINGS_SIGNED_MIN_MAX = "store.parquet.reader.strings_signed_min_max";
+  public static final StringValidator PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR = new EnumeratedStringValidator(PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
+    new OptionDescription("Allows binary statistics usage for files created prior to 1.9.1 parquet library version where " +
+      "statistics was incorrectly calculated for UTF-8 data. For cases when user exactly knows " +
+      "that data in binary columns is in ASCII (not UTF-8), turning this property to 'true' " +
+      "enables statistics usage for varchar and decimal data types. Default is unset, i.e. empty string. " +
+      "Allowed values: 'true', 'false', '' (empty string)."), "true", "false", "");
+
   public static final String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
   public static final OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC,
       new OptionDescription("Enable the asynchronous page reader. This pipelines the reading of data from disk for high performance."));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 531cbabe52f..17bf8515668 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -20,9 +20,13 @@
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.LogicalExpressionBase;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.parquet.column.statistics.Statistics;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -100,9 +104,45 @@ public RowsMatch matches(RangeExprEvaluator<C> evaluator) {
     if (!leftStat.hasNonNullValue() || !rightStat.hasNonNullValue()) {
       return RowsMatch.SOME;
     }
+
+    if (left.getMajorType().getMinorType() == TypeProtos.MinorType.VARDECIMAL) {
+      /*
+        to compare correctly two decimal statistics we need to ensure that min and max values have the same scale,
+        otherwise adjust statistics to the highest scale
+        since decimal value is stored as unscaled we need to move dot to the right on the difference between scales
+       */
+      int leftScale = left.getMajorType().getScale();
+      int rightScale = right.getMajorType().getScale();
+      if (leftScale > rightScale) {
+        rightStat = adjustDecimalStatistics(rightStat, leftScale - rightScale);
+      } else if (leftScale < rightScale) {
+        leftStat = adjustDecimalStatistics(leftStat, rightScale - leftScale);
+      }
+    }
+
     return predicate.apply(leftStat, rightStat);
   }
 
+  /**
+   * Creates decimal statistics where min and max values are re-created using given scale.
+   *
+   * @param statistics statistics that needs to be adjusted
+   * @param scale adjustment scale
+   * @return adjusted statistics
+   */
+  @SuppressWarnings("unchecked")
+  private Statistics<C> adjustDecimalStatistics(Statistics<C> statistics, int scale) {
+    byte[] minBytes = new BigDecimal(new BigInteger(statistics.getMinBytes()))
+      .setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
+    byte[] maxBytes = new BigDecimal(new BigInteger(statistics.getMaxBytes()))
+      .setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
+    return (Statistics<C>) Statistics.getBuilderForReading(statistics.type())
+        .withMin(minBytes)
+        .withMax(maxBytes)
+        .withNumNulls(statistics.getNumNulls())
+        .build();
+  }
+
   /**
    * If one rowgroup contains some null values, change the RowsMatch.ALL into RowsMatch.SOME (null values should be discarded by filter)
    */
@@ -117,9 +157,22 @@ private static RowsMatch checkNull(Statistics leftStat, Statistics rightStat) {
       LogicalExpression left,
       LogicalExpression right
   ) {
-    return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) ->
-      leftStat.compareMaxToValue(rightStat.genericGetMin()) < 0 || rightStat.compareMaxToValue(leftStat.genericGetMin()) < 0 ? RowsMatch.NONE : RowsMatch.SOME
-    ) {
+    return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
+      // compare left max and right min
+      int leftToRightComparison = leftStat.compareMaxToValue(rightStat.genericGetMin());
+      // compare right max and left min
+      int rightToLeftComparison = rightStat.compareMaxToValue(leftStat.genericGetMin());
+
+      // if both comparison results are equal to 0 and both statistics have no nulls,
+      // it means that min and max values in each statistics are the same and match each other,
+      // return that all rows match the condition
+      if (leftToRightComparison == 0 && rightToLeftComparison == 0 && hasNoNulls(leftStat) && hasNoNulls(rightStat)) {
+        return RowsMatch.ALL;
+      }
+
+      // if at least one comparison result is negative, it means that none of the rows match the condition
+      return leftToRightComparison < 0 || rightToLeftComparison < 0 ? RowsMatch.NONE : RowsMatch.SOME;
+    }) {
       @Override
       public String toString() {
         return left + " = " + right;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
index 48a7a90b42b..aa7af42b541 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
@@ -39,24 +39,28 @@
 import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
 import org.apache.drill.exec.vector.ValueHolderHelper;
+import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.DoubleStatistics;
 import org.apache.parquet.column.statistics.FloatStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVisitor<Statistics<T>, Void, RuntimeException> {
-  static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class);
+  private static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class);
 
   private final Map<SchemaPath, ColumnStatistics<T>> columnStatMap;
   private final long rowCount;
@@ -134,6 +138,18 @@ public long getRowCount() {
     return getStatistics(milliSeconds);
   }
 
+  @Override
+  public Statistics<T> visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, Void value) throws RuntimeException {
+    String stringValue = quotedString.getString();
+    return getStatistics(stringValue);
+  }
+
+  @Override
+  public Statistics<T> visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) throws RuntimeException {
+    DecimalMetadata decimalMeta = new DecimalMetadata(decExpr.getMajorType().getPrecision(), decExpr.getMajorType().getScale());
+    return getStatistics(decExpr.getBigDecimal(), decimalMeta);
+  }
+
   @Override
   public Statistics<T> visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) throws RuntimeException {
     FuncHolder funcHolder = holderExpr.getHolder();
@@ -161,7 +177,7 @@ public long getRowCount() {
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(int min, int max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.INT32);
-    ((IntStatistics)statistics).setMinMax(min, max);
+    ((IntStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -172,7 +188,7 @@ public long getRowCount() {
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(boolean min, boolean max) {
     Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.BOOLEAN);
-    ((BooleanStatistics)statistics).setMinMax(min, max);
+    ((BooleanStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -183,7 +199,7 @@ public long getRowCount() {
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(long min, long max) {
     final Statistics statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.INT64);
-    ((LongStatistics)statistics).setMinMax(min, max);
+    ((LongStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -194,7 +210,7 @@ public long getRowCount() {
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(double min, double max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.DOUBLE);
-    ((DoubleStatistics)statistics).setMinMax(min, max);
+    ((DoubleStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -205,10 +221,40 @@ public long getRowCount() {
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(float min, float max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.FLOAT);
-    ((FloatStatistics)statistics).setMinMax(min, max);
+    ((FloatStatistics) statistics).setMinMax(min, max);
+    return statistics;
+  }
+
+  private Statistics<T> getStatistics(String value) {
+    return getStatistics(value, value);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Statistics<T> getStatistics(String min, String max) {
+    final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.BINARY);
+    ((BinaryStatistics) statistics).setMinMaxFromBytes(min.getBytes(), max.getBytes());
     return statistics;
   }
 
+  private Statistics<T> getStatistics(BigDecimal value, DecimalMetadata decimalMetadata) {
+    return getStatistics(value, value, decimalMetadata);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Statistics<T> getStatistics(BigDecimal min, BigDecimal max, DecimalMetadata decimalMetadata) {
+    PrimitiveType decimalType = org.apache.parquet.schema.Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+      .as(OriginalType.DECIMAL)
+      .precision(decimalMetadata.getPrecision())
+      .scale(decimalMetadata.getScale())
+      .named("decimal_type");
+
+    return (Statistics<T>) Statistics.getBuilderForReading(decimalType)
+        .withMin(min.unscaledValue().toByteArray())
+        .withMax(max.unscaledValue().toByteArray())
+        .withNumNulls(0)
+        .build();
+  }
+
   private Statistics<T> evalCastFunc(FunctionHolderExpression holderExpr, Statistics input) {
     try {
       DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
@@ -288,31 +334,31 @@ public long getRowCount() {
   private static final Map<TypeProtos.MinorType, Set<TypeProtos.MinorType>> CAST_FUNC = new HashMap<>();
   static {
     // float -> double , int, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.FLOAT4, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.FLOAT4, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.FLOAT8);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.BIGINT);
 
     // double -> float, int, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.FLOAT8, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.FLOAT8, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.BIGINT);
 
     // int -> float, double, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.INT, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.INT, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.FLOAT8);
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.BIGINT);
 
     // bigint -> int, float, double
-    CAST_FUNC.put(TypeProtos.MinorType.BIGINT, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.BIGINT, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.FLOAT8);
 
     // date -> timestamp
-    CAST_FUNC.put(TypeProtos.MinorType.DATE, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.DATE, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.DATE).add(TypeProtos.MinorType.TIMESTAMP);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index f463e6ddc0f..4684251e988 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -19,13 +19,9 @@
 
 import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
 
-import java.io.IOException;
-
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -37,6 +33,7 @@
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -58,7 +55,7 @@ private PhysicalPlan notSupported(String tbl){
   }
 
   @Override
-  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
     final SqlRefreshMetadata refreshTable = unwrap(sqlNode, SqlRefreshMetadata.class);
 
     try {
@@ -119,7 +116,12 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv
       if (!(formatConfig instanceof ParquetFormatConfig)) {
         formatConfig = new ParquetFormatConfig();
       }
-      Metadata.createMeta(fs, selectionRoot, (ParquetFormatConfig) formatConfig);
+
+      ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+        .withFormatConfig((ParquetFormatConfig) formatConfig)
+        .withOptions(context.getOptions())
+        .build();
+      Metadata.createMeta(fs, selectionRoot, readerConfig);
       return direct(true, "Successfully updated metadata for table %s.", tableName);
 
     } catch(Exception e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1d0bca06127..097b23111c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -168,6 +168,7 @@
       new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
+      new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index f053986f309..dd1c91c9b32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -38,6 +38,7 @@
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
@@ -163,11 +164,20 @@ public StoragePluginConfig getConfig() {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
     FormatSelection formatSelection = selection.getWith(lpPersistance, FormatSelection.class);
     FormatPlugin plugin = getFormatPlugin(formatSelection.getFormat());
-    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns);
+    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index ed3b602144a..27a72e3e10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -28,6 +28,7 @@
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,28 +37,32 @@
  */
 public interface FormatPlugin {
 
-  public boolean supportsRead();
+  boolean supportsRead();
 
-  public boolean supportsWrite();
+  boolean supportsWrite();
 
   /**
    * Indicates whether this FormatPlugin supports auto-partitioning for CTAS statements
    * @return true if auto-partitioning is supported
    */
-  public boolean supportsAutoPartitioning();
+  boolean supportsAutoPartitioning();
 
-  public FormatMatcher getMatcher();
+  FormatMatcher getMatcher();
 
-  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
+  AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
 
-  public Set<StoragePluginOptimizerRule> getOptimizerRules();
+  Set<StoragePluginOptimizerRule> getOptimizerRules();
 
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
+  AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
 
-  public FormatPluginConfig getConfig();
-  public StoragePluginConfig getStorageConfig();
-  public Configuration getFsConf();
-  public DrillbitContext getContext();
-  public String getName();
+  default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+    return getGroupScan(userName, selection, columns);
+  }
+
+  FormatPluginConfig getConfig();
+  StoragePluginConfig getStorageConfig();
+  Configuration getFsConf();
+  DrillbitContext getContext();
+  String getName();
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 1b96e5881f2..9bc969f035b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
@@ -80,14 +81,20 @@
   protected List<RowGroupInfo> rowGroupInfos;
   protected ListMultimap<Integer, RowGroupInfo> mappings;
   protected Set<String> fileSet;
+  protected ParquetReaderConfig readerConfig;
 
   private List<EndpointAffinity> endpointAffinities;
   private ParquetGroupScanStatistics parquetGroupScanStatistics;
 
-  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
+  protected AbstractParquetGroupScan(String userName,
+                                     List<SchemaPath> columns,
+                                     List<ReadEntryWithPath> entries,
+                                     ParquetReaderConfig readerConfig,
+                                     LogicalExpression filter) {
     super(userName);
     this.columns = columns;
     this.entries = entries;
+    this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
   }
 
@@ -103,6 +110,7 @@ protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
     this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
     this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
+    this.readerConfig = that.readerConfig;
   }
 
   @JsonProperty
@@ -115,6 +123,18 @@ protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
     return entries;
   }
 
+  @JsonProperty("readerConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  // do not serialize reader config if it contains all default values
+  public ParquetReaderConfig getReaderConfigForSerialization() {
+    return ParquetReaderConfig.getDefaultInstance().equals(readerConfig) ? null : readerConfig;
+  }
+
+  @JsonIgnore
+  public ParquetReaderConfig getReaderConfig() {
+    return readerConfig;
+  }
+
   @JsonIgnore
   @Override
   public Collection<String> getFiles() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 8726b9d1095..52b2baa0c36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -37,15 +38,18 @@
 
   protected final List<RowGroupReadEntry> rowGroupReadEntries;
   protected final List<SchemaPath> columns;
+  protected final ParquetReaderConfig readerConfig;
   protected final LogicalExpression filter;
 
   protected AbstractParquetRowGroupScan(String userName,
                                      List<RowGroupReadEntry> rowGroupReadEntries,
                                      List<SchemaPath> columns,
+                                     ParquetReaderConfig readerConfig,
                                      LogicalExpression filter) {
     super(userName);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
+    this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
   }
 
@@ -59,6 +63,18 @@ protected AbstractParquetRowGroupScan(String userName,
     return columns;
   }
 
+  @JsonProperty("readerConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  // do not serialize reader config if it contains all default values
+  public ParquetReaderConfig getReaderConfigForSerialization() {
+    return ParquetReaderConfig.getDefaultInstance().equals(readerConfig) ? null : readerConfig;
+  }
+
+  @JsonIgnore
+  public ParquetReaderConfig getReaderConfig() {
+    return readerConfig;
+  }
+
   @JsonProperty
   public LogicalExpression getFilter() {
     return filter;
@@ -80,7 +96,6 @@ public boolean isExecutable() {
   }
 
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
-  public abstract boolean areCorruptDatesAutoCorrected();
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
   public abstract boolean supportsFileImplicitColumns();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index b7f9cdbb87d..99161fd592d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -30,12 +30,10 @@
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -50,17 +48,10 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-
 public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
 
-  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
-  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
-  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
-
   protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
     final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
 
@@ -87,12 +78,13 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
       try {
         Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
         DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
+        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
         if (!footers.containsKey(rowGroup.getPath())) {
           if (timer != null) {
             timer.start();
           }
 
-          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath());
+          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath(), readerConfig);
           if (timer != null) {
             long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
             logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
@@ -101,10 +93,9 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
         }
         ParquetMetadata footer = footers.get(rowGroup.getPath());
 
-        boolean autoCorrectCorruptDates = rowGroupScan.areCorruptDatesAutoCorrected();
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
-          ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
-        logger.debug("Contains corrupt dates: {}", containsCorruptDates);
+        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
+          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
+        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
 
         boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
         boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
@@ -149,7 +140,7 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
     }
 
     // all readers should have the same number of implicit columns, add missing ones with value null
-    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant(null));
     for (Map<String, String> map : implicitColumns) {
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
     }
@@ -159,14 +150,9 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
 
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
-  private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
-    conf = new Configuration(conf);
-    conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
-    conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
-    conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
-    conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
-    ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
-    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+  private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path),
+      readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) {
       return reader.getFooter();
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
index ad3884920ab..f0f1029119e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -144,6 +146,11 @@ public LogicalExpression visitBooleanConstant(ValueExpressions.BooleanExpression
     return booleanExpression;
   }
 
+  @Override
+  public LogicalExpression visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, Set<LogicalExpression> value) throws RuntimeException {
+    return quotedString;
+  }
+
   @Override
   public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExpression> value) {
     List<LogicalExpression> childPredicates = new ArrayList<>();
@@ -176,31 +183,35 @@ public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExp
 
   private LogicalExpression getValueExpressionFromConst(ValueHolder holder, TypeProtos.MinorType type) {
     switch (type) {
-    case INT:
-      return ValueExpressions.getInt(((IntHolder) holder).value);
-    case BIGINT:
-      return ValueExpressions.getBigInt(((BigIntHolder) holder).value);
-    case FLOAT4:
-      return ValueExpressions.getFloat4(((Float4Holder) holder).value);
-    case FLOAT8:
-      return ValueExpressions.getFloat8(((Float8Holder) holder).value);
-    case VARDECIMAL:
-      VarDecimalHolder decimalHolder = (VarDecimalHolder) holder;
-      return ValueExpressions.getVarDecimal(
-          DecimalUtility.getBigDecimalFromDrillBuf(decimalHolder.buffer,
-              decimalHolder.start, decimalHolder.end - decimalHolder.start, decimalHolder.scale),
-          decimalHolder.precision,
-          decimalHolder.scale);
-    case DATE:
-      return ValueExpressions.getDate(((DateHolder) holder).value);
-    case TIMESTAMP:
-      return ValueExpressions.getTimeStamp(((TimeStampHolder) holder).value);
-    case TIME:
-      return ValueExpressions.getTime(((TimeHolder) holder).value);
-    case BIT:
-      return ValueExpressions.getBit(((BitHolder) holder).value == 1);
-    default:
-      return null;
+      case INT:
+        return ValueExpressions.getInt(((IntHolder) holder).value);
+      case BIGINT:
+        return ValueExpressions.getBigInt(((BigIntHolder) holder).value);
+      case FLOAT4:
+        return ValueExpressions.getFloat4(((Float4Holder) holder).value);
+      case FLOAT8:
+        return ValueExpressions.getFloat8(((Float8Holder) holder).value);
+      case VARDECIMAL:
+        VarDecimalHolder decimalHolder = (VarDecimalHolder) holder;
+        return ValueExpressions.getVarDecimal(
+            DecimalUtility.getBigDecimalFromDrillBuf(decimalHolder.buffer,
+                decimalHolder.start, decimalHolder.end - decimalHolder.start, decimalHolder.scale),
+            decimalHolder.precision,
+            decimalHolder.scale);
+      case DATE:
+        return ValueExpressions.getDate(((DateHolder) holder).value);
+      case TIMESTAMP:
+        return ValueExpressions.getTimeStamp(((TimeStampHolder) holder).value);
+      case TIME:
+        return ValueExpressions.getTime(((TimeHolder) holder).value);
+      case BIT:
+        return ValueExpressions.getBit(((BitHolder) holder).value == 1);
+      case VARCHAR:
+        VarCharHolder varCharHolder = (VarCharHolder) holder;
+        String value = StringFunctionHelpers.toStringFromUTF8(varCharHolder.start, varCharHolder.end, varCharHolder.buffer);
+        return ValueExpressions.getChar(value, value.length());
+      default:
+        return null;
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 9d4c453a703..2ff8415687b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -27,6 +27,7 @@
 public class ParquetFormatConfig implements FormatPluginConfig {
 
   public boolean autoCorrectCorruptDates = true;
+  public boolean enableStringsSignedMinMax = false;
 
   /**
    * @return true if auto correction of corrupt dates is enabled, false otherwise
@@ -36,6 +37,21 @@ public boolean areCorruptDatesAutoCorrected() {
     return autoCorrectCorruptDates;
   }
 
+  /**
+   * Parquet statistics for UTF-8 data for files created prior to 1.9.1 parquet library version was stored incorrectly.
+   * If user exactly knows that data in binary columns is in ASCII (not UTF-8), turning this property to 'true'
+   * enables statistics usage for varchar and decimal columns.
+   *
+   * Can be overridden for individual tables using
+   * @link org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX} session option.
+   *
+   * @return true if string signed min max enabled, false otherwise
+   */
+  @JsonIgnore
+  public boolean isStringsSignedMinMaxEnabled() {
+    return enableStringsSignedMinMax;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -47,12 +63,25 @@ public boolean equals(Object o) {
 
     ParquetFormatConfig that = (ParquetFormatConfig) o;
 
-    return autoCorrectCorruptDates == that.autoCorrectCorruptDates;
+    if (autoCorrectCorruptDates != that.autoCorrectCorruptDates) {
+      return false;
+    }
 
+    return enableStringsSignedMinMax == that.enableStringsSignedMinMax;
   }
 
   @Override
   public int hashCode() {
-    return (autoCorrectCorruptDates ? 1231 : 1237);
+    int result = (autoCorrectCorruptDates ? 1231 : 1237);
+    result = 31 * result + (enableStringsSignedMinMax ? 1231 : 1237);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ParquetFormatConfig{"
+      + "autoCorrectCorruptDates=" + autoCorrectCorruptDates
+      + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+      + '}';
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9d828d20e7b..2c409963a0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -40,6 +40,7 @@
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -67,7 +68,7 @@
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-public class ParquetFormatPlugin implements FormatPlugin{
+public class ParquetFormatPlugin implements FormatPlugin {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
   public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
@@ -175,9 +176,17 @@ public WriterRecordBatch getWriterBatch(FragmentContext context, RecordBatch inc
   }
 
   @Override
-  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
-      throws IOException {
-    ParquetGroupScan parquetGroupScan = new ParquetGroupScan(userName, selection, this, columns);
+  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return getGroupScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+      .withFormatConfig(getConfig())
+      .withOptions(options)
+      .build();
+    ParquetGroupScan parquetGroupScan = new ParquetGroupScan(userName, selection, this, columns, readerConfig);
     if (parquetGroupScan.getEntries().isEmpty()) {
       // If ParquetGroupScan does not contain any entries, it means selection directories are empty and
       // metadata cache files are invalid, return schemaless scan
@@ -211,7 +220,7 @@ public FormatMatcher getMatcher() {
     return formatMatcher;
   }
 
-  private static class ParquetFormatMatcher extends BasicFormatMatcher{
+  private static class ParquetFormatMatcher extends BasicFormatMatcher {
 
     private final ParquetFormatConfig formatConfig;
 
@@ -229,7 +238,7 @@ public boolean supportDirectoryReads() {
     public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
         FileSystemPlugin fsPlugin, String storageEngineName, SchemaConfig schemaConfig)
         throws IOException {
-      if(selection.containsDirectories(fs)) {
+      if (selection.containsDirectories(fs)) {
         Path dirMetaPath = new Path(selection.getSelectionRoot(), Metadata.METADATA_DIRECTORIES_FILENAME);
         // check if the metadata 'directories' file exists; if it does, there is an implicit assumption that
         // the directory is readable since the metadata 'directories' file cannot be created otherwise.  Note
@@ -238,7 +247,8 @@ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
           // create a metadata context that will be used for the duration of the query for this table
           MetadataContext metaContext = new MetadataContext();
 
-          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, formatConfig);
+          ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, readerConfig);
           if (mDirs != null && mDirs.getDirectories().size() > 0) {
             FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
                 selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
@@ -249,7 +259,7 @@ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
                 new FormatSelection(plugin.getConfig(), dirSelection));
           }
         }
-        if(isDirReadable(fs, selection.getFirstPath(fs))) {
+        if (isDirReadable(fs, selection.getFirstPath(fs))) {
           return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
               new FormatSelection(plugin.getConfig(), selection));
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 7fe665d31d6..a1d9f518daf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -80,8 +80,9 @@ public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry,
                           @JsonProperty("columns") List<SchemaPath> columns,
                           @JsonProperty("selectionRoot") String selectionRoot,
                           @JsonProperty("cacheFileRoot") String cacheFileRoot,
+                          @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                           @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
-    super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
     Preconditions.checkNotNull(storageConfig);
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -98,16 +99,18 @@ public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry,
   public ParquetGroupScan(String userName,
                           FileSelection selection,
                           ParquetFormatPlugin formatPlugin,
-                          List<SchemaPath> columns) throws IOException {
-    this(userName, selection, formatPlugin, columns, ValueExpressions.BooleanExpression.TRUE);
+                          List<SchemaPath> columns,
+                          ParquetReaderConfig readerConfig) throws IOException {
+    this(userName, selection, formatPlugin, columns, readerConfig, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public ParquetGroupScan(String userName,
                           FileSelection selection,
                           ParquetFormatPlugin formatPlugin,
                           List<SchemaPath> columns,
+                          ParquetReaderConfig readerConfig,
                           LogicalExpression filter) throws IOException {
-    super(userName, columns, new ArrayList<>(), filter);
+    super(userName, columns, new ArrayList<>(), readerConfig, filter);
 
     this.formatPlugin = formatPlugin;
     this.formatConfig = formatPlugin.getConfig();
@@ -178,7 +181,7 @@ public String getCacheFileRoot() {
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
@@ -245,13 +248,13 @@ protected void initInternal() throws IOException {
         metaPath = new Path(p, Metadata.METADATA_FILENAME);
       }
       if (!metaContext.isMetadataCacheCorrupted() && metaPath != null && fs.exists(metaPath)) {
-        parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+        parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
         if (parquetTableMetadata != null) {
           usedMetadataCache = true;
         }
       }
       if (!usedMetadataCache) {
-        parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), formatConfig);
+        parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig);
       }
     } else {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
@@ -259,7 +262,7 @@ protected void initInternal() throws IOException {
       if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot))
           && fs.exists(metaPath)) {
         if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+          parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
         }
         if (parquetTableMetadata != null) {
           usedMetadataCache = true;
@@ -283,7 +286,7 @@ protected void initInternal() throws IOException {
                     (oldFs, newFs) -> newFs,
                     LinkedHashMap::new));
 
-        parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, formatConfig);
+        parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, readerConfig);
       }
     }
   }
@@ -362,7 +365,8 @@ private boolean checkForInitializingEntriesWithSelectionRoot() {
    * @param metaFilePath metadata cache file path
    * @return file selection read from cache
    *
-   * @throws UserException when the updated selection is empty, this happens if the user selects an empty folder.
+   * @throws org.apache.drill.common.exceptions.UserException when the updated selection is empty,
+   * this happens if the user selects an empty folder.
    */
   private FileSelection expandSelectionFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException {
     // get the metadata for the root directory by reading the metadata file
@@ -371,14 +375,14 @@ private FileSelection expandSelectionFromMetadataCache(FileSelection selection,
 
     // get (and set internal field) the metadata for the directory by reading the metadata file
     FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
-    parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePath, metaContext, formatConfig);
+    parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePath, metaContext, readerConfig);
     if (ignoreExpandingSelection(parquetTableMetadata)) {
       return selection;
     }
     if (formatConfig.areCorruptDatesAutoCorrected()) {
       ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
     }
-    ParquetReaderUtility.correctBinaryInMetadataCache(parquetTableMetadata);
+    ParquetReaderUtility.transformBinaryInMetadataCache(parquetTableMetadata, readerConfig);
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -412,7 +416,7 @@ private FileSelection expandSelectionFromMetadataCache(FileSelection selection,
         if (status.isDirectory()) {
           //TODO [DRILL-4496] read the metadata cache files in parallel
           final Path metaPath = new Path(cacheFileRoot, Metadata.METADATA_FILENAME);
-          final ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+          final ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
           if (ignoreExpandingSelection(metadata)) {
             return selection;
           }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 6efd44dbb78..c59cdce8060 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -18,8 +18,6 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -45,6 +43,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -137,22 +136,22 @@ protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel pro
     // then we could not pushed down. Otherwise, it's qualified to be pushed down.
     final List<RexNode> predList = RelOptUtil.conjunctions(condition);
 
-    final List<RexNode> qualifiedPredList = Lists.newArrayList();
+    final List<RexNode> qualifiedPredList = new ArrayList<>();
 
     for (final RexNode pred : predList) {
-      if (DrillRelOptUtil.findOperators(pred, ImmutableList.of(), BANNED_OPERATORS) == null) {
+      if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
         qualifiedPredList.add(pred);
       }
     }
 
-    final RexNode qualifedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
+    final RexNode qualifiedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
 
-    if (qualifedPred == null) {
+    if (qualifiedPred == null) {
       return;
     }
 
     LogicalExpression conditionExp = DrillOptiq.toDrill(
-        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifedPred);
+        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
 
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
@@ -170,7 +169,7 @@ protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel pro
     RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
 
     if (project != null) {
-      newScan = project.copy(project.getTraitSet(), ImmutableList.of(newScan));
+      newScan = project.copy(project.getTraitSet(), Collections.singletonList(newScan));
     }
 
     if (newGroupScan instanceof AbstractParquetGroupScan) {
@@ -184,10 +183,11 @@ protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel pro
       }
       if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
         call.transformTo(newScan);
+        return;
       }
     }
 
-    final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.<RelNode>of(newScan));
+    final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newScan));
     call.transformTo(newFilter);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
new file mode 100644
index 00000000000..3b668c75712
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+
+import java.util.Objects;
+
+/**
+ * Stores consolidated parquet reading configuration. Can obtain config values from various sources:
+ * Assignment priority of configuration values is the following:
+ * <li>parquet format config</li>
+ * <li>Hadoop configuration</li>
+ * <li>session options</li>
+ *
+ * During serialization does not deserialize the default values in keep serialized object smaller.
+ */
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class ParquetReaderConfig {
+
+  public static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
+  public static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
+  public static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
+
+  private static final ParquetReaderConfig DEFAULT_INSTANCE = new ParquetReaderConfig();
+
+  private boolean enableBytesReadCounter = false;
+  private boolean enableBytesTotalCounter = false;
+  private boolean enableTimeReadCounter = false;
+  private boolean autoCorrectCorruptedDates = true;
+  private boolean enableStringsSignedMinMax = false;
+
+  public static ParquetReaderConfig.Builder builder() {
+    return new ParquetReaderConfig.Builder();
+  }
+
+  public static ParquetReaderConfig getDefaultInstance() {
+    return DEFAULT_INSTANCE;
+  }
+
+  @JsonCreator
+  public ParquetReaderConfig(@JsonProperty("enableBytesReadCounter") Boolean enableBytesReadCounter,
+                             @JsonProperty("enableBytesTotalCounter") Boolean enableBytesTotalCounter,
+                             @JsonProperty("enableTimeReadCounter") Boolean enableTimeReadCounter,
+                             @JsonProperty("autoCorrectCorruptedDates") Boolean autoCorrectCorruptedDates,
+                             @JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax) {
+    this.enableBytesReadCounter = enableBytesReadCounter == null ? this.enableBytesReadCounter : enableBytesReadCounter;
+    this.enableBytesTotalCounter = enableBytesTotalCounter == null ? this.enableBytesTotalCounter : enableBytesTotalCounter;
+    this.enableTimeReadCounter = enableTimeReadCounter == null ? this.enableTimeReadCounter : enableTimeReadCounter;
+    this.autoCorrectCorruptedDates = autoCorrectCorruptedDates == null ? this.autoCorrectCorruptedDates : autoCorrectCorruptedDates;
+    this.enableStringsSignedMinMax = enableStringsSignedMinMax == null ? this.enableStringsSignedMinMax : enableStringsSignedMinMax;
+  }
+
+  private ParquetReaderConfig() { }
+
+  @JsonProperty("enableBytesReadCounter")
+  public boolean enableBytesReadCounter() {
+    return enableBytesReadCounter;
+  }
+
+  @JsonProperty("enableBytesTotalCounter")
+  public boolean enableBytesTotalCounter() {
+    return enableBytesTotalCounter;
+  }
+
+  @JsonProperty("enableTimeReadCounter")
+  public boolean enableTimeReadCounter() {
+    return enableTimeReadCounter;
+  }
+
+  @JsonProperty("autoCorrectCorruptedDates")
+  public boolean autoCorrectCorruptedDates() {
+    return autoCorrectCorruptedDates;
+  }
+
+  @JsonProperty("enableStringsSignedMinMax")
+  public boolean enableStringsSignedMinMax() {
+    return enableStringsSignedMinMax;
+  }
+
+  public ParquetReadOptions toReadOptions() {
+    return ParquetReadOptions.builder()
+      .useSignedStringMinMax(enableStringsSignedMinMax)
+      .build();
+  }
+
+  public Configuration addCountersToConf(Configuration conf) {
+    Configuration newConfig = new Configuration(conf);
+    newConfig.setBoolean(ENABLE_BYTES_READ_COUNTER, enableBytesReadCounter);
+    newConfig.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, enableBytesTotalCounter);
+    newConfig.setBoolean(ENABLE_TIME_READ_COUNTER, enableTimeReadCounter);
+    return newConfig;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(enableBytesReadCounter,
+      enableBytesTotalCounter,
+      enableTimeReadCounter,
+      autoCorrectCorruptedDates,
+      enableStringsSignedMinMax);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ParquetReaderConfig that = (ParquetReaderConfig) o;
+    return enableBytesReadCounter == that.enableBytesReadCounter
+      && enableBytesTotalCounter == that.enableBytesTotalCounter
+      && enableTimeReadCounter == that.enableTimeReadCounter
+      && autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
+      && enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+  }
+
+  @Override
+  public String toString() {
+    return "ParquetReaderConfig{"
+      + "enableBytesReadCounter=" + enableBytesReadCounter
+      + ", enableBytesTotalCounter=" + enableBytesTotalCounter
+      + ", enableTimeReadCounter=" + enableTimeReadCounter
+      + ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+      + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+      + '}';
+  }
+
+  public static class Builder {
+
+    private ParquetFormatConfig formatConfig;
+    private Configuration conf;
+    private OptionManager options;
+
+    public Builder withFormatConfig(ParquetFormatConfig formatConfig) {
+      this.formatConfig = formatConfig;
+      return this;
+    }
+
+    public Builder withConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder withOptions(OptionManager options) {
+      this.options = options;
+      return this;
+    }
+
+    public ParquetReaderConfig build() {
+      ParquetReaderConfig readerConfig = new ParquetReaderConfig();
+
+      // first assign configuration values from format config
+      if (formatConfig != null) {
+        readerConfig.autoCorrectCorruptedDates = formatConfig.areCorruptDatesAutoCorrected();
+        readerConfig.enableStringsSignedMinMax = formatConfig.isStringsSignedMinMaxEnabled();
+      }
+
+      // then assign configuration values from Hadoop configuration
+      if (conf != null) {
+        readerConfig.enableBytesReadCounter = conf.getBoolean(ENABLE_BYTES_READ_COUNTER, readerConfig.enableBytesReadCounter);
+        readerConfig.enableBytesTotalCounter = conf.getBoolean(ENABLE_BYTES_TOTAL_COUNTER, readerConfig.enableBytesTotalCounter);
+        readerConfig.enableTimeReadCounter = conf.getBoolean(ENABLE_TIME_READ_COUNTER, readerConfig.enableTimeReadCounter);
+      }
+
+      // last assign values from session options, session options have higher priority than other configurations
+      if (options != null) {
+        String option = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
+        if (!option.isEmpty()) {
+          readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
+        }
+      }
+
+      return readerConfig;
+    }
+
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 8bfdf18cb78..733915b6124 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -28,6 +27,7 @@
 import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.work.ExecErrorConstants;
+import org.apache.hadoop.util.VersionUtil;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -52,6 +52,7 @@
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -98,6 +99,9 @@
    * Prior versions had dates formatted with {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#CORRECT_CORRUPT_DATE_SHIFT}
    */
   public static final int DRILL_WRITER_VERSION_STD_DATE_FORMAT = 2;
+
+  public static final String ALLOWED_DRILL_VERSION_FOR_BINARY = "1.15.0";
+
   /**
    * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
    * based on the file metadata. For older files that lack statistics we must actually test the values
@@ -275,42 +279,60 @@ else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetada
   }
 
   /**
-   * Checks assigns byte arrays to min/max values obtained from the deserialized string
-   * for BINARY.
+   *
+   * Transforms values for min / max binary statistics to byte array.
+   * Transformation logic depends on metadata file version.
    *
    * @param parquetTableMetadata table metadata that should be corrected
+   * @param readerConfig parquet reader config
    */
-  public static void correctBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) {
+  public static void transformBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata, ParquetReaderConfig readerConfig) {
     // Looking for the names of the columns with BINARY data type
     // in the metadata cache file for V2 and all v3 versions
     Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
+    boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);
 
     for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
       for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
         Long rowCount = rowGroupMetadata.getRowCount();
         for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
-          // Setting Min/Max values for ParquetTableMetadata_v1
+          // Setting Min / Max values for ParquetTableMetadata_v1
           if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
             if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
                 || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-              setMinMaxValues(columnMetadata, rowCount);
+              setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
             }
           }
-          // Setting Min/Max values for V2 and all V3 versions before V3_3
+          // Setting Min / Max values for V2 and all V3 versions prior to V3_3
           else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
                     && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
-            setMinMaxValues(columnMetadata, rowCount);
+            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
           }
-          // Setting Min/Max values for V3_3 and all younger versions
+          // Setting Min / Max values for V3_3 and all next versions
           else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
                       && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
-            convertMinMaxValues(columnMetadata, rowCount);
+            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, true);
           }
         }
       }
     }
   }
 
+  /**
+   * If binary metadata was stored prior to Drill version {@link #ALLOWED_DRILL_VERSION_FOR_BINARY},
+   * it might have incorrectly defined min / max values.
+   * In case if given version is null, we assume this version is prior to {@link #ALLOWED_DRILL_VERSION_FOR_BINARY}.
+   * In this case we allow reading such metadata only if {@link ParquetReaderConfig#enableStringsSignedMinMax()} is true.
+   *
+   * @param drillVersion drill version used to create metadata file
+   * @param readerConfig parquet reader configuration
+   * @return true if reading binary min / max values are allowed, false otherwise
+   */
+  private static boolean allowBinaryMetadata(String drillVersion, ParquetReaderConfig readerConfig) {
+    return readerConfig.enableStringsSignedMinMax() ||
+      (drillVersion != null && VersionUtil.compareVersions(ALLOWED_DRILL_VERSION_FOR_BINARY, drillVersion) <= 0);
+  }
+
   /**
    * Returns the set of the lists with names of the columns with BINARY or
    * FIXED_LEN_BYTE_ARRAY data type from {@code ParquetTableMetadataBase columnTypeMetadataCollection}
@@ -320,7 +342,7 @@ else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareT
    * @return set of the lists with column names
    */
   private static Set<List<String>> getBinaryColumnsNames(ParquetTableMetadataBase parquetTableMetadata) {
-    Set<List<String>> names = Sets.newHashSet();
+    Set<List<String>> names = new HashSet<>();
     if (parquetTableMetadata instanceof ParquetTableMetadata_v2) {
       for (ColumnTypeMetadata_v2 columnTypeMetadata :
         ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
@@ -342,39 +364,36 @@ else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareT
   }
 
   /**
-   * Checks that column has single value and replaces Min and Max by their byte values
-   * in {@code Metadata.ColumnMetadata columnMetadata} if their values were stored as strings.
+   * If binary metadata is not allowed (logic is defined in
+   * {@link ParquetReaderUtility#allowBinaryMetadata(String, ParquetReaderConfig)}),
+   * set min max values for binary column, only if min and max values are the same,
+   * otherwise set min and max as nulls.
    *
    * @param columnMetadata column metadata that should be changed
-   * @param rowCount       rows count in column chunk
+   * @param rowCount rows count in column chunk
+   * @param allowBinaryMetadata if reading binary metadata is allowed
+   * @param needsDecoding if min and max values is Base64 data and should be decoded
    */
-  private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount) {
-    if (columnMetadata.hasSingleValue(rowCount)) {
-      Object minValue = columnMetadata.getMinValue();
-      if (minValue != null && minValue instanceof String) {
-        byte[] bytes = ((String) minValue).getBytes();
-        columnMetadata.setMax(bytes);
-        columnMetadata.setMin(bytes);
-      }
-    }
-  }
+  private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount, boolean allowBinaryMetadata, boolean needsDecoding) {
+    byte[] minBytes = null;
+    byte[] maxBytes = null;
 
-  /**
-   * Checks that column has single value and replaces Min and Max by their byte values from Base64 data
-   * in Metadata.ColumnMetadata columnMetadata if their values were stored as strings.
-   *
-   * @param columnMetadata column metadata that should be changed
-   * @param rowCount       rows count in column chunk
-   */
-  private static void convertMinMaxValues(ColumnMetadata columnMetadata, long rowCount) {
-    if (columnMetadata.hasSingleValue(rowCount)) {
+    boolean hasSingleValue = false;
+    if (allowBinaryMetadata || (hasSingleValue = columnMetadata.hasSingleValue(rowCount))) {
       Object minValue = columnMetadata.getMinValue();
-      if (minValue != null && minValue instanceof String) {
-        byte[] bytes = Base64.decodeBase64(((String) minValue).getBytes());
-        columnMetadata.setMax(bytes);
-        columnMetadata.setMin(bytes);
+      Object maxValue = columnMetadata.getMaxValue();
+      if (minValue instanceof String && maxValue instanceof String) {
+        minBytes = ((String) minValue).getBytes();
+        maxBytes = ((String) maxValue).getBytes();
+        if (needsDecoding) {
+          minBytes = Base64.decodeBase64(minBytes);
+          maxBytes = hasSingleValue ? minBytes : Base64.decodeBase64(maxBytes);
+        }
       }
     }
+
+    columnMetadata.setMin(minBytes);
+    columnMetadata.setMax(maxBytes);
   }
 
   /**
@@ -522,7 +541,7 @@ public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(Parquet
    * @return  Timestamp in milliseconds - the number of milliseconds since January 1, 1970, 00:00:00 GMT
    *          represented by @param binaryTimeStampValue.
    *          The nanos precision is cut to millis. Therefore the length of single timestamp value is
-   *          {@value NullableTimeStampHolder#WIDTH} bytes instead of 12 bytes.
+   *          {@value org.apache.drill.exec.expr.holders.NullableTimeStampHolder#WIDTH} bytes instead of 12 bytes.
    */
     public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue, boolean retainLocalTimezone) {
       // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored as sum of
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index cbcca79b9c4..eabe2df1447 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -54,12 +54,14 @@ public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
                              @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
                              @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
                              @JsonProperty("columns") List<SchemaPath> columns,
+                             @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                              @JsonProperty("selectionRoot") String selectionRoot,
                              @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
         columns,
+        readerConfig,
         selectionRoot,
         filter);
   }
@@ -68,9 +70,10 @@ public ParquetRowGroupScan(String userName,
                              ParquetFormatPlugin formatPlugin,
                              List<RowGroupReadEntry> rowGroupReadEntries,
                              List<SchemaPath> columns,
+                             ParquetReaderConfig readerConfig,
                              String selectionRoot,
                              LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, filter);
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
     this.selectionRoot = selectionRoot;
@@ -99,7 +102,7 @@ public ParquetFormatPlugin getStorageEngine() {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
@@ -109,12 +112,7 @@ public int getOperatorType() {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter);
-  }
-
-  @Override
-  public boolean areCorruptDatesAutoCorrected() {
-    return formatConfig.areCorruptDatesAutoCorrected();
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index b16a4ccb261..0db007ab6c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,6 +24,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -35,7 +36,6 @@
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.store.dfs.MetadataContext;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,7 +45,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -64,6 +63,7 @@
 import java.io.OutputStream;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -88,16 +88,15 @@
   public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
   public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
-  public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
 
-  private final ParquetFormatConfig formatConfig;
+  private final ParquetReaderConfig readerConfig;
 
   private ParquetTableMetadataBase parquetTableMetadata;
   private ParquetTableMetadataDirs parquetTableMetadataDirs;
 
 
-  private Metadata(ParquetFormatConfig formatConfig) {
-    this.formatConfig = formatConfig;
+  private Metadata(ParquetReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
   }
 
   /**
@@ -105,9 +104,10 @@ private Metadata(ParquetFormatConfig formatConfig) {
    *
    * @param fs file system
    * @param path path
+   * @param readerConfig parquet reader configuration
    */
-  public static void createMeta(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+  public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     metadata.createMetaFilesRecursively(path, fs);
   }
 
@@ -116,11 +116,12 @@ public static void createMeta(FileSystem fs, String path, ParquetFormatConfig fo
    *
    * @param fs file system
    * @param path path
+   * @param readerConfig parquet reader configuration
+   *
    * @return parquet table metadata
    */
-  public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, String path, ParquetFormatConfig formatConfig)
-      throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+  public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     return metadata.getParquetTableMetadata(path, fs);
   }
 
@@ -128,12 +129,12 @@ public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, Str
    * Get the parquet metadata for a list of parquet files.
    *
    * @param fileStatusMap file statuses and corresponding file systems
-   * @param formatConfig parquet format config
+   * @param readerConfig parquet reader configuration
    * @return parquet table metadata
    */
   public static ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap,
-                                                                ParquetFormatConfig formatConfig) throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+                                                                ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     return metadata.getParquetTableMetadata(fileStatusMap);
   }
 
@@ -143,15 +144,17 @@ public static ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, Fi
    * @param fs current file system
    * @param path The path to the metadata file, located in the directory that contains the parquet files
    * @param metaContext metadata context
-   * @param formatConfig parquet format plugin configs
+   * @param readerConfig parquet reader configuration
    * @return parquet table metadata. Null if metadata cache is missing, unsupported or corrupted
    */
-  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs, Path path, MetadataContext metaContext,
-      ParquetFormatConfig formatConfig) {
+  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs,
+                                                                 Path path,
+                                                                 MetadataContext metaContext,
+                                                                 ParquetReaderConfig readerConfig) {
     if (ignoreReadingMetadata(metaContext, path)) {
       return null;
     }
-    Metadata metadata = new Metadata(formatConfig);
+    Metadata metadata = new Metadata(readerConfig);
     metadata.readBlockMeta(path, false, metaContext, fs);
     return metadata.parquetTableMetadata;
   }
@@ -162,15 +165,17 @@ public static ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, Fi
    * @param fs current file system
    * @param path The path to the metadata file, located in the directory that contains the parquet files
    * @param metaContext metadata context
-   * @param formatConfig parquet format plugin configs
+   * @param readerConfig parquet reader configuration
    * @return parquet metadata for a directory. Null if metadata cache is missing, unsupported or corrupted
    */
-  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, Path path,
-      MetadataContext metaContext, ParquetFormatConfig formatConfig) {
+  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs,
+                                                                    Path path,
+                                                                    MetadataContext metaContext,
+                                                                    ParquetReaderConfig readerConfig) {
     if (ignoreReadingMetadata(metaContext, path)) {
       return null;
     }
-    Metadata metadata = new Metadata(formatConfig);
+    Metadata metadata = new Metadata(readerConfig);
     metadata.readBlockMeta(path, true, metaContext, fs);
     return metadata.parquetTableMetadataDirs;
   }
@@ -412,12 +417,9 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3
     final ParquetMetadata metadata;
     final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
     final Configuration conf = new Configuration(fs.getConf());
-    final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
-        .useSignedStringMinMax(true)
-        .build();
     try {
       metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
-        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
           return parquetFileReader.getFooter();
         }
       });
@@ -429,8 +431,7 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3
 
     MessageType schema = metadata.getFileMetaData().getSchema();
 
-//    Map<SchemaPath, OriginalType> originalTypeMap = Maps.newHashMap();
-    Map<SchemaPath, ColTypeInfo> colTypeInfoMap = Maps.newHashMap();
+    Map<SchemaPath, ColTypeInfo> colTypeInfoMap = new HashMap<>();
     schema.getPaths();
     for (String[] path : schema.getPaths()) {
       colTypeInfoMap.put(SchemaPath.getCompoundPath(path), getColTypeInfo(schema, schema, path, 0));
@@ -440,11 +441,10 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
     ALL_COLS.add(SchemaPath.STAR_COLUMN);
-    boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
-    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
-    if (logger.isDebugEnabled()) {
-      logger.debug(containsCorruptDates.toString());
-    }
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS,
+      readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
     for (BlockMetaData rowGroup : metadata.getBlocks()) {
       List<ColumnMetadata_v3> columnMetadataList = new ArrayList<>();
       long length = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index e8f2b797111..8ce5f37a76c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.stat;
 
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -29,10 +30,12 @@
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.joda.time.DateTimeConstants;
 
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +53,6 @@
   private final ParquetTableMetadataBase parquetTableMetadata;
   private final List<? extends ColumnMetadata> columnMetadataList;
   private final Map<String, String> implicitColValues;
-
   public ParquetMetaStatCollector(ParquetTableMetadataBase parquetTableMetadata,
       List<? extends ColumnMetadata> columnMetadataList, Map<String, String> implicitColValues) {
     this.parquetTableMetadata = parquetTableMetadata;
@@ -86,30 +88,26 @@ public ParquetMetaStatCollector(ParquetTableMetadataBase parquetTableMetadata,
       columnMetadataMap.put(schemaPath, columnMetadata);
     }
 
-    for (final SchemaPath field : fields) {
-      final PrimitiveType.PrimitiveTypeName primitiveType;
-      final OriginalType originalType;
-
-      final ColumnMetadata columnMetadata = columnMetadataMap.get(field.getUnIndexed());
-
+    for (SchemaPath field : fields) {
+      ColumnMetadata columnMetadata = columnMetadataMap.get(field.getUnIndexed());
       if (columnMetadata != null) {
-        final Object min = columnMetadata.getMinValue();
-        final Object max = columnMetadata.getMaxValue();
-        final long numNulls = columnMetadata.getNulls() == null ? -1 : columnMetadata.getNulls();
-
-        primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
-        originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
-        int precision = 0;
-        int scale = 0;
+        ColumnStatisticsBuilder statisticsBuilder = ColumnStatisticsBuilder.builder()
+          .setMin(columnMetadata.getMinValue())
+          .setMax(columnMetadata.getMaxValue())
+          .setNumNulls(columnMetadata.getNulls() == null ? GroupScan.NO_COLUMN_STATS: columnMetadata.getNulls())
+          .setPrimitiveType(parquetTableMetadata.getPrimitiveType(columnMetadata.getName()))
+          .setOriginalType(parquetTableMetadata.getOriginalType(columnMetadata.getName()));
+
         // ColumnTypeMetadata_v3 stores information about scale and precision
         if (parquetTableMetadata instanceof ParquetTableMetadata_v3) {
           ColumnTypeMetadata_v3 columnTypeInfo = ((ParquetTableMetadata_v3) parquetTableMetadata)
                                                                           .getColumnTypeInfo(columnMetadata.getName());
-          scale = columnTypeInfo.scale;
-          precision = columnTypeInfo.precision;
+          statisticsBuilder.setScale(columnTypeInfo.scale);
+          statisticsBuilder.setPrecision(columnTypeInfo.precision);
         }
 
-        statMap.put(field, getStat(min, max, numNulls, primitiveType, originalType, scale, precision));
+        statMap.put(field, statisticsBuilder.build());
+
       } else {
         final String columnName = field.getRootSegment().getPath();
         if (implicitColValues.containsKey(columnName)) {
@@ -132,62 +130,172 @@ public ParquetMetaStatCollector(ParquetTableMetadataBase parquetTableMetadata,
   }
 
   /**
-   * Builds column statistics using given primitiveType, originalType, scale,
-   * precision, numNull, min and max values.
+   * Helper class that creates parquet {@link ColumnStatistics} based on given
+   * min and max values, type, number of nulls, precision and scale.
    *
-   * @param min             min value for statistics
-   * @param max             max value for statistics
-   * @param numNulls        num_nulls for statistics
-   * @param primitiveType   type that determines statistics class
-   * @param originalType    type that determines statistics class
-   * @param scale           scale value (used for DECIMAL type)
-   * @param precision       precision value (used for DECIMAL type)
-   * @return column statistics
    */
-  private ColumnStatistics getStat(Object min, Object max, long numNulls,
-                                   PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType,
-                                   int scale, int precision) {
-    Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
-    Statistics convertedStat = stat;
-
-    TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType, originalType, scale, precision);
-    stat.setNumNulls(numNulls);
-
-    if (min != null && max != null ) {
-      switch (type.getMinorType()) {
-      case INT :
-      case TIME:
-        ((IntStatistics) stat).setMinMax(Integer.parseInt(min.toString()), Integer.parseInt(max.toString()));
-        break;
-      case BIGINT:
-      case TIMESTAMP:
-        ((LongStatistics) stat).setMinMax(Long.parseLong(min.toString()), Long.parseLong(max.toString()));
-        break;
-      case FLOAT4:
-        ((FloatStatistics) stat).setMinMax(Float.parseFloat(min.toString()), Float.parseFloat(max.toString()));
-        break;
-      case FLOAT8:
-        ((DoubleStatistics) stat).setMinMax(Double.parseDouble(min.toString()), Double.parseDouble(max.toString()));
-        break;
-      case DATE:
-        convertedStat = new LongStatistics();
-        convertedStat.setNumNulls(stat.getNumNulls());
-        final long minMS = convertToDrillDateValue(Integer.parseInt(min.toString()));
-        final long maxMS = convertToDrillDateValue(Integer.parseInt(max.toString()));
-        ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
-        break;
-      case BIT:
-        ((BooleanStatistics) stat).setMinMax(Boolean.parseBoolean(min.toString()), Boolean.parseBoolean(max.toString()));
-        break;
-      default:
-      }
+  private static class ColumnStatisticsBuilder {
+
+    private Object min;
+    private Object max;
+    private long numNulls;
+    private PrimitiveType.PrimitiveTypeName primitiveType;
+    private OriginalType originalType;
+    private int scale;
+    private int precision;
+
+    static ColumnStatisticsBuilder builder() {
+      return new ColumnStatisticsBuilder();
     }
 
-    return new ColumnStatistics(convertedStat, type);
-  }
+    ColumnStatisticsBuilder setMin(Object min) {
+      this.min = min;
+      return this;
+    }
 
-  private static long convertToDrillDateValue(int dateValue) {
+    ColumnStatisticsBuilder setMax(Object max) {
+      this.max = max;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setNumNulls(long numNulls) {
+      this.numNulls = numNulls;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setPrimitiveType(PrimitiveType.PrimitiveTypeName primitiveType) {
+      this.primitiveType = primitiveType;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setOriginalType(OriginalType originalType) {
+      this.originalType = originalType;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setScale(int scale) {
+      this.scale = scale;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setPrecision(int precision) {
+      this.precision = precision;
+      return this;
+    }
+
+
+    /**
+     * Builds column statistics using given primitive and original types,
+     * scale, precision, number of nulls, min and max values.
+     * Min and max values for binary statistics are set only if allowed.
+     *
+     * @return column statistics
+     */
+    ColumnStatistics build() {
+      Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
+      Statistics convertedStat = stat;
+
+      TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType, originalType, scale, precision);
+      stat.setNumNulls(numNulls);
+
+      if (min != null && max != null) {
+        switch (type.getMinorType()) {
+          case INT :
+          case TIME:
+            ((IntStatistics) stat).setMinMax(Integer.parseInt(min.toString()), Integer.parseInt(max.toString()));
+            break;
+          case BIGINT:
+          case TIMESTAMP:
+            ((LongStatistics) stat).setMinMax(Long.parseLong(min.toString()), Long.parseLong(max.toString()));
+            break;
+          case FLOAT4:
+            ((FloatStatistics) stat).setMinMax(Float.parseFloat(min.toString()), Float.parseFloat(max.toString()));
+            break;
+          case FLOAT8:
+            ((DoubleStatistics) stat).setMinMax(Double.parseDouble(min.toString()), Double.parseDouble(max.toString()));
+            break;
+          case DATE:
+            convertedStat = new LongStatistics();
+            convertedStat.setNumNulls(stat.getNumNulls());
+            long minMS = convertToDrillDateValue(Integer.parseInt(min.toString()));
+            long maxMS = convertToDrillDateValue(Integer.parseInt(max.toString()));
+            ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
+            break;
+          case BIT:
+            ((BooleanStatistics) stat).setMinMax(Boolean.parseBoolean(min.toString()), Boolean.parseBoolean(max.toString()));
+            break;
+          case VARCHAR:
+            if (min instanceof Binary && max instanceof Binary) { // when read directly from parquet footer
+              ((BinaryStatistics) stat).setMinMaxFromBytes(((Binary) min).getBytes(), ((Binary) max).getBytes());
+            } else if (min instanceof byte[] && max instanceof byte[]) { // when deserialized from Drill metadata file
+              ((BinaryStatistics) stat).setMinMaxFromBytes((byte[]) min, (byte[]) max);
+            } else {
+              logger.trace("Unexpected class for Varchar statistics for min / max values. Min: {}. Max: {}.",
+                min.getClass(), max.getClass());
+            }
+            break;
+          case VARDECIMAL:
+            byte[] minBytes = null;
+            byte[] maxBytes = null;
+            boolean setLength = false;
+
+            switch (primitiveType) {
+              case INT32:
+              case INT64:
+                minBytes = new BigInteger(min.toString()).toByteArray();
+                maxBytes = new BigInteger(max.toString()).toByteArray();
+                break;
+              case FIXED_LEN_BYTE_ARRAY:
+                setLength = true;
+                // fall through
+              case BINARY:
+                // wrap up into BigInteger to avoid PARQUET-1417
+                if (min instanceof Binary && max instanceof Binary) { // when read directly from parquet footer
+                  minBytes = new BigInteger(((Binary) min).getBytes()).toByteArray();
+                  maxBytes = new BigInteger(((Binary) max).getBytes()).toByteArray();
+                } else if (min instanceof byte[] && max instanceof byte[]) {  // when deserialized from Drill metadata file
+                  minBytes = new BigInteger((byte[]) min).toByteArray();
+                  maxBytes = new BigInteger((byte[]) max).toByteArray();
+                } else {
+                  logger.trace("Unexpected class for Binary Decimal statistics for min / max values. Min: {}. Max: {}.",
+                    min.getClass(), max.getClass());
+                }
+                break;
+              default:
+                logger.trace("Unexpected primitive type [{}] for Decimal statistics.", primitiveType);
+            }
+
+            if (minBytes == null || maxBytes == null) {
+              break;
+            }
+
+            int length = setLength ? maxBytes.length : 0;
+
+            PrimitiveType decimalType = org.apache.parquet.schema.Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+              .as(OriginalType.DECIMAL)
+              .length(length)
+              .precision(precision)
+              .scale(scale)
+              .named("decimal_type");
+
+            convertedStat = Statistics.getBuilderForReading(decimalType)
+              .withMin(minBytes)
+              .withMax(maxBytes)
+              .withNumNulls(numNulls)
+              .build();
+            break;
+
+          default:
+            logger.trace("Unsupported minor type [{}] for parquet statistics.", type.getMinorType());
+        }
+      }
+      return new ColumnStatistics(convertedStat, type);
+    }
+
+    private long convertToDrillDateValue(int dateValue) {
       return dateValue * (long) DateTimeConstants.MILLIS_PER_DAY;
+    }
+
   }
 
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f083c6603c8..02df7ea85d9 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -591,6 +591,7 @@ drill.exec.options: {
     store.parquet.page-size: 1048576,
     store.parquet.reader.columnreader.async: false,
     store.parquet.reader.int96_as_timestamp: false,
+    store.parquet.reader.strings_signed_min_max: "",
     store.parquet.reader.pagereader.async: true,
     store.parquet.reader.pagereader.bufferedread: true,
     store.parquet.reader.pagereader.buffersize: 1048576,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
index 0cbda0387cb..3be2fe5549c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
@@ -82,7 +82,7 @@ public void testDRILL3410() throws Exception {
     test("use dfs.tmp");
     test("create table drill_3410 partition by (o_orderpriority) as select * from dfs.`multilevel/parquet`");
     String query = "select * from drill_3410 where (o_orderpriority = '1-URGENT' and o_orderkey = 10) or (o_orderpriority = '2-HIGH' or o_orderkey = 11)";
-    testIncludeFilter(query, 1, "Filter\\(", 34);
+    testIncludeFilter(query, 3, "Filter\\(", 34);
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index 20438cdedde..7e68dd43cf5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -35,7 +35,9 @@ public void testParquet() throws Exception {
       testPhysicalPlanSubmission(
           String.format("select * from table(cp.`%s`(type=>'parquet'))", "parquet/alltypes_required.parquet"),
           String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false))", "parquet/alltypes_required.parquet"),
-          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", "parquet/alltypes_required.parquet"));
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", "parquet/alltypes_required.parquet"),
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true, enableStringsSignedMinMax=>false))", "parquet/alltypes_required.parquet"),
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false, enableStringsSignedMinMax=>true))", "parquet/alltypes_required.parquet"));
     } finally {
       resetSessionOption(ExecConstants.SLICE_TARGET);
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index e53c394ece1..f43afb7b3f1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -54,7 +54,7 @@ public void test() {
           assertEquals("(type: String, name: String)", d.presentParams());
           break;
         case "parquet":
-          assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: boolean)", d.presentParams());
+          assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
           break;
         case "json":
         case "sequencefile":
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 525e7ed7227..e80497a6c06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.commons.io.filefilter.FalseFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.drill.PlanTestBase;
@@ -27,7 +26,6 @@
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -41,7 +39,6 @@
 import java.nio.file.Paths;
 import java.nio.file.attribute.FileTime;
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -53,7 +50,7 @@
   private static final String TABLE_NAME_2 = "parquetTable2";
 
   @BeforeClass
-  public static void copyData() throws Exception {
+  public static void copyData() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(TABLE_NAME_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(TABLE_NAME_2));
@@ -790,71 +787,6 @@ public void testIntervalYearPartitionPruning() throws Exception {
     }
   }
 
-  @Test // DRILL-4139
-  public void testVarCharWithNullsPartitionPruning() throws Exception {
-    final String intervalYearPartitionTable = "dfs.tmp.`varchar_optional_partition`";
-    try {
-      test("create table %s partition by (col_vrchr) as " +
-        "select * from cp.`parquet/alltypes_optional.parquet`", intervalYearPartitionTable);
-
-      String query = String.format("select * from %s where col_vrchr = 'Nancy Cloke'",
-        intervalYearPartitionTable);
-      int expectedRowCount = 1;
-
-      int actualRowCount = testSql(query);
-      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
-
-      test("refresh table metadata %s", intervalYearPartitionTable);
-
-      actualRowCount = testSql(query);
-      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
-    } finally {
-      test("drop table if exists %s", intervalYearPartitionTable);
-    }
-  }
-
-  @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
-  @Test // DRILL-4139
-  public void testDecimalPartitionPruning() throws Exception {
-    List<String> ctasQueries = Lists.newArrayList();
-    // decimal stores as fixed_len_byte_array
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select * from cp.`parquet/fixedlenDecimal.parquet`");
-    // decimal stores as int32
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
-      "from cp.`parquet/fixedlenDecimal.parquet`");
-    // decimal stores as int64
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
-      "from cp.`parquet/fixedlenDecimal.parquet`");
-    final String decimalPartitionTable = "dfs.tmp.`decimal_optional_partition`";
-    for (String ctasQuery : ctasQueries) {
-      try {
-        test("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
-        test(ctasQuery, decimalPartitionTable);
-
-        String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable);
-        int expectedRowCount = 6;
-
-        int actualRowCount = testSql(query);
-        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
-
-        test("refresh table metadata %s", decimalPartitionTable);
-
-        actualRowCount = testSql(query);
-        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
-      } finally {
-        test("drop table if exists %s", decimalPartitionTable);
-        test("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
-      }
-    }
-  }
-
   @Test // DRILL-4139
   public void testIntWithNullsPartitionPruning() throws Exception {
     try {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
new file mode 100644
index 00000000000..7bbae415b1d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestParquetReaderConfig {
+
+  @Test
+  public void testDefaultsDeserialization() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().build(); // all defaults
+    String value = mapper.writeValueAsString(readerConfig);
+    assertEquals(ParquetReaderConfig.getDefaultInstance(), readerConfig); // compare with default instance
+    assertEquals("{}", value);
+
+    readerConfig = mapper.readValue(value, ParquetReaderConfig.class);
+    assertTrue(readerConfig.autoCorrectCorruptedDates()); // check that default value is restored
+
+    // change the default: set autoCorrectCorruptedDates to false
+    // keep the default: set enableStringsSignedMinMax to false
+    readerConfig = new ParquetReaderConfig(false, false, false, false, false);
+
+    value = mapper.writeValueAsString(readerConfig);
+    assertEquals("{\"autoCorrectCorruptedDates\":false}", value);
+  }
+
+  @Test
+  public void testAddConfigToConf() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, true);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, true);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, true);
+
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withConf(conf).build();
+    Configuration newConf = readerConfig.addCountersToConf(new Configuration());
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, "true");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, "true");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, "true");
+
+    conf = new Configuration();
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, false);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, false);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, false);
+
+    readerConfig = ParquetReaderConfig.builder().withConf(conf).build();
+    newConf = readerConfig.addCountersToConf(new Configuration());
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, "false");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, "false");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, "false");
+  }
+
+  @Test
+  public void testReadOptions() {
+    // set enableStringsSignedMinMax to true
+    ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true);
+    ParquetReadOptions readOptions = readerConfig.toReadOptions();
+    assertTrue(readOptions.useSignedStringMinMax());
+
+    // set enableStringsSignedMinMax to false
+    readerConfig = new ParquetReaderConfig(false, false, false, true, false);
+    readOptions = readerConfig.toReadOptions();
+    assertFalse(readOptions.useSignedStringMinMax());
+  }
+
+  @Test
+  public void testPriorityAssignmentForStringsSignedMinMax() throws Exception {
+    SystemOptionManager options = new SystemOptionManager(DrillConfig.create()).init();
+
+    // use value from format config
+    ParquetFormatConfig formatConfig = new ParquetFormatConfig();
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+
+    // change format config value
+    formatConfig.enableStringsSignedMinMax = true;
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+
+    // set option, option value should have higher priority
+    options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "false");
+
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+    assertFalse(readerConfig.enableStringsSignedMinMax());
+
+    // set option as empty (undefined), config should have higher priority
+    options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "");
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+  }
+
+
+  private void checkConfigValue(Configuration conf, String name, String expectedValue) {
+    String actualValue = conf.get(name);
+    assertNotNull(actualValue);
+    assertEquals(expectedValue, actualValue);
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
new file mode 100644
index 00000000000..85c641ea557
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPushDownAndPruningForDecimal extends ClusterTest {
+
+  private static File fileStore;
+  private List<String> tablesToDrop = new ArrayList<>();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    /*
+      Contains two data files generated by Drill 1.13.0 version (before upgrade to Parquet lib 1.10.0).
+      Also contains .drill.parquet_metadata generated for these two files.
+
+      Schema:
+
+      message root {
+        required int32 part_int_32 (DECIMAL(5,2));
+        required int32 val_int_32 (DECIMAL(5,2));
+        required int64 part_int_64 (DECIMAL(16,2));
+        required int64 val_int_64 (DECIMAL(16,2));
+        required fixed_len_byte_array(12) part_fixed (DECIMAL(16,2));
+        required fixed_len_byte_array(12) val_fixed (DECIMAL(16,2));
+      }
+
+      Data:
+
+      0_0_1.parquet
+      -----------------------------------------------------------------------------
+      part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed
+      -----------------------------------------------------------------------------
+      1.00        | 1.05       | 1.00        | 1.05       | 1.00       | 1.05
+      1.00        | 10.0       | 1.00        | 10.0       | 1.00       | 10.0
+      1.00        | 10.25      | 1.00        | 10.25      | 1.00       | 10.25
+      -----------------------------------------------------------------------------
+
+      0_0_2.parquet
+      -----------------------------------------------------------------------------
+      part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed
+      -----------------------------------------------------------------------------
+      2.00        | 2.05       | 2.00        | 2.05       | 2.00       | 2.05
+      2.00        | 20.0       | 2.00        | 20.0       | 2.00       | 20.0
+      2.00        | 20.25      | 2.00        | 20.25      | 2.00       | 20.25
+      -----------------------------------------------------------------------------
+     */
+    fileStore = dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "decimal_gen_1_13_0"));
+    startCluster(builder);
+  }
+
+  @After
+  public void reset() {
+    // reset all session options that might have been used in the tests
+    client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+    client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+    client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+
+    // drop all tables
+    tablesToDrop.forEach(
+      t -> client.runSqlSilently(String.format("drop table if exists %s", t))
+    );
+    tablesToDrop.clear();
+  }
+
+  /**
+   * Check partition pruning for old and new int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldNewIntDecimalPruningNoMeta() throws Exception {
+    String oldTable = createTable("old_int_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.new_int_decimal_pruning_no_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run();
+    for (String column : Arrays.asList("part_int_32", "part_int_64")) {
+      for (String table : Arrays.asList(oldTable, newTable)) {
+        String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.00 as decimal(5, 2))", table, column);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("val_int_32", "val_int_64")
+          .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+          .baselineValues(new BigDecimal("10.00"), new BigDecimal("10.00"))
+          .baselineValues(new BigDecimal("10.25"), new BigDecimal("10.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new int_32 and int_64 decimal files
+   * using Drill metadata file generated by current Drill version (i.e. after upgrade to parquet 1.10.0)
+   */
+  @Test
+  public void testOldNewIntDecimalPruningWithMeta() throws Exception {
+    String oldTable = createTable("old_int_decimal_pruning_with_meta", true);
+    String newTable = "dfs.`tmp`.new_int_decimal_pruning_with_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run();
+
+    for (String table : Arrays.asList(oldTable, newTable)) {
+      queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+      for (String column : Arrays.asList("part_int_32", "part_int_64")) {
+        String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(2.00 as decimal(5,2))", table, column);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("val_int_32", "val_int_64")
+          .baselineValues(new BigDecimal("2.05"), new BigDecimal("2.05"))
+          .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00"))
+          .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldIntDecimalPushDownNoMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_no_meta", true);
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      // push down does not work for old int decimal types because stats is not read: PARQUET-1322
+      assertTrue(plan.contains("numRowGroups=2"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0
+   */
+  @Test
+  public void testOldIntDecimalPushDownWithOldMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_with_old_meta", false);
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0
+   */
+  @Test
+  public void testOldIntDecimalPushDownWithNewMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_with_new_meta", false);
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      // push down does not work for old int decimal types because stats is not read: PARQUET-1322
+      assertTrue(plan.contains("numRowGroups=2"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for new int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testNewIntDecimalPushDownNoMeta() throws Exception {
+    String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true);
+    String table = "dfs.`tmp`.new_int_decimal_push_down_no_meta";
+    tablesToDrop.add(table);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for new int_32 and int_64 decimal files
+   * using Drill metadata file.
+   */
+  @Test
+  public void testNewIntDecimalPushDownWithMeta() throws Exception {
+    String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true);
+    String table = "dfs.`tmp`.new_int_decimal_push_down_with_meta";
+    tablesToDrop.add(table);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run();
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.0 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00"))
+        .go();
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldNewFixedDecimalPruningNoMeta() throws Exception {
+    String oldTable = createTable("old_fixed_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.new_fixed_decimal_pruning_no_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array");
+    queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s",
+      newTable, oldTable)).run();
+
+    for (String table : Arrays.asList(oldTable, newTable)) {
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part_fixed", "val_fixed")
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPruningWithOldMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_pruning_with_old_meta", false);
+    for (String optionValue : Arrays.asList("true", "false", "")) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+      String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldNewFixedDecimalPruningWithNewMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_pruning_with_new_meta", false);
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+    for (String optionValue : Arrays.asList("true", "false", "")) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+      String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(2.00 as decimal(5, 2))", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old fixed decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownNoMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_no_meta", true);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table);
+    String plan = client.queryBuilder().sql(query).explainText();
+    // statistics for fixed decimal is not available for files generated prior to parquet 1.10.0 version
+    assertTrue(plan.contains("numRowGroups=2"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for old fixed decimal with
+   * using old Drill metadata file, i.e. created prior to upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownWithOldMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_with_old_meta", false);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=2");
+
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains(property.getValue()));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old fixed decimal with
+   * using new Drill metadata file, i.e. created after upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownWithNewMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_with_new_meta", true);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(20.25 as decimal(5, 2))", table);
+
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=2"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for fixed decimal generated after upgrade to parquet 1.10.0
+   * with and without using Drill metadata file.
+   */
+  @Test
+  public void testNewFixedDecimalPushDown() throws Exception {
+    String dataTable = createTable("data_table_for_fixed_decimal_push_down_no_meta", true);
+    String newTable = "dfs.`tmp`.new_fixed_decimal_pruning";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array");
+    queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s",
+      newTable, dataTable)).run();
+
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    // metadata for binary is allowed only after Drill 1.15.0
+    // set string signed option to true since test was written on Drill 1.15.0-SNAPSHOT version
+    client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check partition pruning for binary decimal with and without
+   * using Drill metadata file.
+   */
+  @Test
+  public void testBinaryDecimalPruning() throws Exception {
+    String dataTable = createTable("data_table for_binary_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.binary_decimal_pruning";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary");
+    queryBuilder().sql(String.format("create table %s partition by (part_binary) "
+      + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run();
+
+    String query = String.format("select part_binary, val_binary from %s where part_binary = cast(1.00 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+    assertFalse(plan.contains("Filter"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+    assertFalse(plan.contains("Filter"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for binary decimal with and without
+   * using Drill metadata file.
+   */
+  @Test
+  public void testBinaryDecimalPushDown() throws Exception {
+    String dataTable = createTable("data_table_for_binary_decimal_push_down_no_meta", true);
+    String newTable = "dfs.`tmp`.binary_decimal_push_down";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary");
+    queryBuilder().sql(String.format("create table %s partition by (part_binary) "
+      + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run();
+
+    String query = String.format("select part_binary, val_binary from %s where val_binary = cast(1.05 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    // metadata for binary is allowed only after Drill 1.15.0
+    // set string signed option to true, since test was written on Drill 1.15.0-SNAPSHOT version
+    client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check partition pruning for decimals with different scale.
+   */
+  @Test
+  public void testDecimalPruningDifferentScale() throws Exception {
+    String dataTable = createTable("data_table_for_different_scale_pruning_check", true);
+    String newTable = "dfs.`tmp`.table_for_different_scale_pruning_check";
+    tablesToDrop.add(newTable);
+    queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s",
+      newTable, dataTable)).run();
+
+    for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) {
+      String query = String.format("select part, val from %s where part = cast(2.0 as %s)", newTable, decimalType);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for decimals with different scale.
+   */
+  @Test
+  public void testDecimalPushDownDifferentScale() throws Exception {
+    String dataTable = createTable("data_table_for_different_scale_push_down_check", true);
+    String newTable = "dfs.`tmp`.table_for_different_scale_push_down_check";
+    tablesToDrop.add(newTable);
+    queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s",
+      newTable, dataTable)).run();
+
+    for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) {
+      String query = String.format("select part, val from %s where val = cast(20.0 as %s)", newTable, decimalType);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .go();
+    }
+  }
+
+  @Ignore("Statistics for DECIMAL that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0")
+  @Test
+  public void testDecimalPruningWithNullPartition() throws Exception {
+    List<String> ctasQueries = new ArrayList<>();
+    // decimal stores as fixed_len_byte_array
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select * from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int32
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int64
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    final String decimalPartitionTable = "dfs.tmp.`decimal_optional_partition`";
+    for (String ctasQuery : ctasQueries) {
+      try {
+        queryBuilder().sql(String.format(ctasQuery, decimalPartitionTable)).run();
+
+        String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable);
+        int expectedRowCount = 6;
+
+        long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+        queryBuilder().sql(String.format("refresh table metadata %s", decimalPartitionTable)).run();
+
+        actualRowCount = client.queryBuilder().sql(query).run().recordCount();
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      } finally {
+        client.runSqlSilently(String.format("drop table if exists %s", decimalPartitionTable));
+      }
+    }
+  }
+
+  private String createTable(String tableName, boolean removeMetadata) throws IOException {
+    File rootDir = dirTestWatcher.getRootDir();
+    File tablePath = new File(rootDir, String.format("%s_%s", tableName, UUID.randomUUID()));
+    FileUtils.copyDirectory(fileStore, tablePath);
+    File metadata = new File(tablePath, ".drill.parquet_metadata");
+    if (removeMetadata) {
+      assertTrue(metadata.delete());
+    } else {
+      // metadata modification time should be higher
+      // than directory modification time otherwise metadata file will be regenerated
+      assertTrue(metadata.setLastModified(Instant.now().toEpochMilli()));
+    }
+    String table = String.format("dfs.`root`.`%s`", tablePath.getName());
+    tablesToDrop.add(table);
+    return table;
+  }
+
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
new file mode 100644
index 00000000000..a039468450c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPushDownAndPruningForVarchar extends ClusterTest {
+
+  private static File fileStore;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    /*
+      Contains two data files generated by Drill 1.13.0 version
+      (before upgrade to Parquet lib 1.10.0).
+      Each file has two varchar columns.
+
+      0_0_1.parquet       0_0_2.parquet
+      -----------         -----------
+      part | val          part | val
+      -----------         -----------
+      A    | A1           B    | B1
+      A    | A2           B    | B2
+
+      Also contains .drill.parquet_metadata generated for these two files.
+     */
+    fileStore = dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "varchar_gen_1_13_0"));
+    startCluster(builder);
+  }
+
+  @Test
+  public void testOldFilesPruningWithAndWithoutMeta() throws Exception {
+    String tableNoMeta = createTable("varchar_pruning_old_without_meta", true);
+    String tableWithMeta = createTable("varchar_pruning_old_with_meta", false);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(tableNoMeta, "false");
+    properties.put(tableWithMeta, "true");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        for (String optionValue : Arrays.asList("true", "false", "")) {
+          client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+          String query = String.format("select * from %s where part = 'A'", property.getKey());
+          String plan = client.queryBuilder().sql(query).explainText();
+          assertTrue(plan.contains("numRowGroups=1"));
+          assertTrue(plan.contains(String.format("usedMetadataFile=%s", property.getValue())));
+          assertFalse(plan.contains("Filter"));
+
+          client.testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("part", "val")
+            .baselineValues("A", "A1")
+            .baselineValues("A", "A2")
+            .go();
+        }
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+
+      properties.keySet().
+        forEach(k -> client.runSqlSilently(String.format("drop table if exists %s", k)));
+    }
+  }
+
+  @Test
+  public void testOldFilesPruningWithNewMeta() throws Exception {
+    String table = createTable("varchar_pruning_old_with_new_meta", true);
+
+    try {
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+        String query = String.format("select * from %s where part = 'A'", table);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testNewFilesPruningNoMeta() throws Exception {
+    String oldTable = createTable("varchar_pruning_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_pruning_new_without_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where part = 'A'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Test
+  public void testNewFilesPruningWithNewMeta() throws Exception {
+    String oldTable = createTable("varchar_pruning_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_pruning_new_with_new_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+      queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where part = 'A'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Ignore("Statistics for VARCHAR that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0.")
+  @Test
+  public void testNewFilesPruningWithNullPartition() throws Exception {
+    String table = "dfs.`tmp`.`varchar_pruning_new_with_null_partition`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (col_vrchr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", table)).run();
+
+      String query = String.format("select * from %s where col_vrchr = 'Nancy Cloke'", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("usedMetadataFile=false"));
+      assertFalse(plan.contains("Filter"));
+
+      QueryBuilder.QuerySummary result = client.queryBuilder().sql(query).run();
+      assertTrue(result.succeeded());
+      assertEquals(1, result.recordCount());
+
+      queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+      plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      result = client.queryBuilder().sql(query).run();
+      assertTrue(result.succeeded());
+      assertEquals(1, result.recordCount());
+    } finally {
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testOldFilesPushDownNoMeta() throws Exception {
+    String table = createTable("varchar_push_down_old_without_meta", true);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=2");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        String query = String.format("select * from %s where val = 'A1'", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains(property.getValue()));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testOldFilesPushDownWithOldMeta() throws Exception {
+    String table = createTable("varchar_push_down_old_with_old_meta", false);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("false", "numRowGroups=2");
+    properties.put("true", "numRowGroups=1");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        String query = String.format("select * from %s where val = 'A1'", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains(property.getValue()));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testNewFilesPushDownNoMeta() throws Exception {
+    String oldTable = createTable("varchar_push_down_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_push_down_new_without_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where val = 'A1'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Test
+  public void testNewFilesPushDownWithMeta() throws Exception {
+    String oldTable = createTable("varchar_push_down_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_push_down_new_with_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+      queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+      String query = String.format("select * from %s where val = 'A1'", newTable);
+      // metadata for binary is allowed only after Drill 1.15.0
+      // set string signed option to true, to read it on current Drill 1.15.0-SNAPSHOT version
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues("A", "A1")
+        .go();
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  private String createTable(String tableName, boolean removeMetadata) throws IOException {
+    File rootDir = dirTestWatcher.getRootDir();
+    File table = new File(rootDir, String.format("%s_%s", tableName, UUID.randomUUID()));
+    FileUtils.copyDirectory(fileStore, table);
+    File metadata = new File(table, ".drill.parquet_metadata");
+    if (removeMetadata) {
+      assertTrue(metadata.delete());
+    } else {
+      // metadata modification time should be higher
+      // than directory modification time otherwise metadata file will be regenerated
+      assertTrue(metadata.setLastModified(Instant.now().toEpochMilli()));
+    }
+    return String.format("dfs.`root`.`%s`", table.getName());
+  }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 8e045a27159..996898ee470 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -167,7 +167,7 @@ public Properties getClientProps() {
   private void configureZk() {
     // Start ZK if requested.
 
-    String zkConnect = null;
+    String zkConnect;
     if (builder.zkHelper != null) {
       // Case where the test itself started ZK and we're only using it.
 
@@ -594,18 +594,22 @@ public static ClusterFixture standardCluster(BaseDirTestWatcher dirTestWatcher)
   /**
    * Convert a Java object (typically a boxed scalar) to a string
    * for use in SQL. Quotes strings but just converts others to
-   * string format.
+   * string format. If value to encode is null, return null.
    *
    * @param value the value to encode
    * @return the SQL-acceptable string equivalent
    */
 
   public static String stringify(Object value) {
+    if (value == null) {
+      return null;
+    }
+
     if (value instanceof String) {
       return "'" + value + "'";
-    } else {
-      return value.toString();
     }
+
+    return value.toString();
   }
 
   public static String getResource(String resource) throws IOException {
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata
new file mode 100644
index 00000000000..24c3fc12ae0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata
@@ -0,0 +1,146 @@
+{
+  "metadata_version" : "3.3",
+  "columnTypeInfo" : {
+    "`val_int_64`" : {
+      "name" : [ "val_int_64" ],
+      "primitiveType" : "INT64",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`val_int_32`" : {
+      "name" : [ "val_int_32" ],
+      "primitiveType" : "INT32",
+      "originalType" : "DECIMAL",
+      "precision" : 5,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_fixed`" : {
+      "name" : [ "part_fixed" ],
+      "primitiveType" : "FIXED_LEN_BYTE_ARRAY",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_int_64`" : {
+      "name" : [ "part_int_64" ],
+      "primitiveType" : "INT64",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_int_32`" : {
+      "name" : [ "part_int_32" ],
+      "primitiveType" : "INT32",
+      "originalType" : "DECIMAL",
+      "precision" : 5,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`val_fixed`" : {
+      "name" : [ "val_fixed" ],
+      "primitiveType" : "FIXED_LEN_BYTE_ARRAY",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    }
+  },
+  "files" : [ {
+    "path" : "0_0_2.parquet",
+    "length" : 1072,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 390,
+      "rowCount" : 3,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part_int_32" ],
+        "minValue" : 200,
+        "maxValue" : 200,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_32" ],
+        "minValue" : 205,
+        "maxValue" : 2025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_int_64" ],
+        "minValue" : 200,
+        "maxValue" : 200,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_64" ],
+        "minValue" : 205,
+        "maxValue" : 2025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAADI",
+        "maxValue" : "AAAAAAAAAAAAAADI",
+        "nulls" : 0
+      }, {
+        "name" : [ "val_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAADN",
+        "maxValue" : "AAAAAAAAAAAAAAfp",
+        "nulls" : 0
+      } ]
+    } ]
+  }, {
+    "path" : "0_0_1.parquet",
+    "length" : 1072,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 390,
+      "rowCount" : 3,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part_int_32" ],
+        "minValue" : 100,
+        "maxValue" : 100,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_32" ],
+        "minValue" : 105,
+        "maxValue" : 1025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_int_64" ],
+        "minValue" : 100,
+        "maxValue" : 100,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_64" ],
+        "minValue" : 105,
+        "maxValue" : 1025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAABk",
+        "maxValue" : "AAAAAAAAAAAAAABk",
+        "nulls" : 0
+      }, {
+        "name" : [ "val_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAABp",
+        "maxValue" : "AAAAAAAAAAAAAAQB",
+        "nulls" : 0
+      } ]
+    } ]
+  } ],
+  "directories" : [ ],
+  "drillVersion" : "1.13.0"
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet
new file mode 100644
index 00000000000..c5f6e899132
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet
new file mode 100644
index 00000000000..cd8a9301aa6
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata
new file mode 100644
index 00000000000..9d8525b26b6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata
@@ -0,0 +1,70 @@
+{
+  "metadata_version" : "3.3",
+  "columnTypeInfo" : {
+    "`val`" : {
+      "name" : [ "val" ],
+      "primitiveType" : "BINARY",
+      "originalType" : "UTF8",
+      "precision" : 0,
+      "scale" : 0,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 1
+    },
+    "`part`" : {
+      "name" : [ "part" ],
+      "primitiveType" : "BINARY",
+      "originalType" : "UTF8",
+      "precision" : 0,
+      "scale" : 0,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 1
+    }
+  },
+  "files" : [ {
+    "path" : "0_0_2.parquet",
+    "length" : 368,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 90,
+      "rowCount" : 2,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part" ],
+        "minValue" : "Qg==",
+        "maxValue" : "Qg==",
+        "nulls" : 0
+      }, {
+        "name" : [ "val" ],
+        "minValue" : "QjE=",
+        "maxValue" : "QjI=",
+        "nulls" : 0
+      } ]
+    } ]
+  }, {
+    "path" : "0_0_1.parquet",
+    "length" : 368,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 90,
+      "rowCount" : 2,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part" ],
+        "minValue" : "QQ==",
+        "maxValue" : "QQ==",
+        "nulls" : 0
+      }, {
+        "name" : [ "val" ],
+        "minValue" : "QTE=",
+        "maxValue" : "QTI=",
+        "nulls" : 0
+      } ]
+    } ]
+  } ],
+  "directories" : [ ],
+  "drillVersion" : "1.13.0"
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet
new file mode 100644
index 00000000000..bf7f8d8ebd3
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet
new file mode 100644
index 00000000000..c76ed91b32f
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet differ
diff --git a/pom.xml b/pom.xml
index a75e251d2f7..902e3c81303 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,7 +63,6 @@
     <hamcrest.core.version>1.3</hamcrest.core.version>
     <maven.embedder.version>3.5.3</maven.embedder.version>
     <curator.version>2.7.1</curator.version>
-    <parquet.hadoop.version>1.8.1</parquet.hadoop.version>
     <wiremock.standalone.version>2.5.1</wiremock.standalone.version>
     <jmockit.version>1.39</jmockit.version>
     <logback.version>1.0.13</logback.version>
@@ -376,6 +375,7 @@
             <exclude>**/*.autotools</exclude>
             <exclude>**/*.cproject</exclude>
             <exclude>**/*.drill</exclude>
+            <exclude>**/.drill.parquet_metadata</exclude>
             <!-- TODO DRILL-4336: try to avoid the need to add this -->
             <exclude>dependency-reduced-pom.xml</exclude>
           </excludes>
@@ -646,6 +646,7 @@
               <exclude>**/NOTICE</exclude>
               <exclude>KEYS</exclude>
               <exclude>header</exclude>
+              <exclude>**/.drill.parquet_metadata</exclude>
               <!-- TODO DRILL-4336: try to avoid the need to add this -->
               <exclude>dependency-reduced-pom.xml</exclude>
             </excludes>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services