You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/11/15 23:51:34 UTC

[drill] 05/07: DRILL-6744: Support varchar and decimal push down

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 527f1fd2452fdb3fd30c0cc154222a099e352f93
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Wed Oct 31 20:24:03 2018 +0200

    DRILL-6744: Support varchar and decimal push down
    
    1. Added enableStringsSignedMinMax parquet format plugin config and store.parquet.reader.strings_signed_min_max session option to control reading binary statistics for files generated by prior versions of Parquet 1.10.0.
    2. Added ParquetReaderConfig to store configuration needed during reading parquet statistics or files.
    3. Provided mechanism to enable varchar / decimal filter push down.
    4. Added VersionUtil to compare Drill versions in string representation.
    5. Added appropriate unit tests.
    
    closes #1537
---
 .../exec/store/mapr/db/MapRDBFormatPlugin.java     |   2 +-
 .../ConvertHiveParquetScanToDrillParquetScan.java  |  10 +-
 .../hive/HiveDrillNativeParquetRowGroupScan.java   |  17 +-
 .../store/hive/HiveDrillNativeParquetScan.java     |  19 +-
 .../exec/TestHiveDrillNativeParquetReader.java     |  41 ++
 .../exec/store/hive/HiveTestDataGenerator.java     |  10 +-
 .../java/org/apache/drill/exec/ExecConstants.java  |   8 +
 .../exec/expr/stat/ParquetComparisonPredicate.java |  59 +-
 .../drill/exec/expr/stat/RangeExprEvaluator.java   |  68 +-
 .../sql/handlers/RefreshMetadataHandler.java       |  14 +-
 .../exec/server/options/SystemOptionManager.java   |   1 +
 .../drill/exec/store/dfs/FileSystemPlugin.java     |  16 +-
 .../apache/drill/exec/store/dfs/FormatPlugin.java  |  29 +-
 .../store/parquet/AbstractParquetGroupScan.java    |  22 +-
 .../store/parquet/AbstractParquetRowGroupScan.java |  17 +-
 .../parquet/AbstractParquetScanBatchCreator.java   |  32 +-
 .../exec/store/parquet/ParquetFilterBuilder.java   |  61 +-
 .../exec/store/parquet/ParquetFormatConfig.java    |  33 +-
 .../exec/store/parquet/ParquetFormatPlugin.java    |  26 +-
 .../drill/exec/store/parquet/ParquetGroupScan.java |  30 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  18 +-
 .../exec/store/parquet/ParquetReaderConfig.java    | 202 ++++++
 .../exec/store/parquet/ParquetReaderUtility.java   |  95 +--
 .../exec/store/parquet/ParquetRowGroupScan.java    |  14 +-
 .../exec/store/parquet/metadata/Metadata.java      |  66 +-
 .../parquet/stat/ParquetMetaStatCollector.java     | 244 +++++--
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../org/apache/drill/TestCTASPartitionFilter.java  |   2 +-
 .../drill/exec/store/FormatPluginSerDeTest.java    |   4 +-
 .../store/dfs/TestFormatPluginOptionExtractor.java |   2 +-
 .../store/parquet/TestParquetMetadataCache.java    |  70 +-
 .../store/parquet/TestParquetReaderConfig.java     | 125 ++++
 .../parquet/TestPushDownAndPruningForDecimal.java  | 720 +++++++++++++++++++++
 .../parquet/TestPushDownAndPruningForVarchar.java  | 361 +++++++++++
 .../java/org/apache/drill/test/ClusterFixture.java |  12 +-
 .../decimal_gen_1_13_0/.drill.parquet_metadata     | 146 +++++
 .../parquet/decimal_gen_1_13_0/0_0_1.parquet       | Bin 0 -> 1072 bytes
 .../parquet/decimal_gen_1_13_0/0_0_2.parquet       | Bin 0 -> 1072 bytes
 .../varchar_gen_1_13_0/.drill.parquet_metadata     |  70 ++
 .../parquet/varchar_gen_1_13_0/0_0_1.parquet       | Bin 0 -> 368 bytes
 .../parquet/varchar_gen_1_13_0/0_0_2.parquet       | Bin 0 -> 368 bytes
 pom.xml                                            |   3 +-
 42 files changed, 2304 insertions(+), 366 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index fc8a057..b5cff58 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -131,7 +131,7 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
   @Override
   public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
       List<SchemaPath> columns) throws IOException {
-    return getGroupScan(userName, selection, columns, null /* indexDesc */);
+    return getGroupScan(userName, selection, columns, (IndexDesc) null /* indexDesc */);
   }
 
   @JsonIgnore
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index ea71157..7286d7a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -32,11 +32,13 @@ import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.DrillSqlOperator;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider;
 import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -97,7 +99,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
       }
 
       final Map<String, String> partitionColMapping = getPartitionColMapping(hiveTable, partitionColumnLabel);
-      final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits);
+      final DrillScanRel nativeScanRel = createNativeScanRel(partitionColMapping, hiveScanRel, logicalInputSplits, settings.getOptions());
       if (hiveScanRel.getRowType().getFieldCount() == 0) {
         call.transformTo(nativeScanRel);
       } else {
@@ -138,7 +140,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
    */
   private DrillScanRel createNativeScanRel(final Map<String, String> partitionColMapping,
                                            final DrillScanRel hiveScanRel,
-                                           final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits) throws Exception {
+                                           final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits,
+                                           final OptionManager options) throws Exception {
 
     final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
@@ -178,7 +181,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
             nativeScanCols,
             hiveScan.getStoragePlugin(),
             logicalInputSplits,
-            hiveScan.getConfProperties());
+            hiveScan.getConfProperties(),
+            ParquetReaderConfig.builder().withOptions(options).build());
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index 2e8f95d..bea06e0 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -56,6 +57,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             @JsonProperty("columns") List<SchemaPath> columns,
                                             @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                             @JsonProperty("confProperties") Map<String, String> confProperties,
+                                            @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                                             @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
@@ -63,6 +65,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
         columns,
         hivePartitionHolder,
         confProperties,
+        readerConfig,
         filter);
   }
 
@@ -72,8 +75,9 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
                                             List<SchemaPath> columns,
                                             HivePartitionHolder hivePartitionHolder,
                                             Map<String, String> confProperties,
+                                            ParquetReaderConfig readerConfig,
                                             LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, filter);
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
@@ -103,7 +107,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
@@ -113,12 +118,8 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
-  }
-
-  @Override
-  public boolean areCorruptDatesAutoCorrected() {
-    return true;
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 1db8df2..617f6a5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -37,7 +38,6 @@ import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
 import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.RowGroupInfo;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -73,8 +73,9 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     @JsonProperty("entries") List<ReadEntryWithPath> entries,
                                     @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
                                     @JsonProperty("confProperties") Map<String, String> confProperties,
+                                    @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                                     @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
-    super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
     this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
     this.hivePartitionHolder = hivePartitionHolder;
     this.confProperties = confProperties;
@@ -86,8 +87,9 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
-                                    Map<String, String> confProperties) throws IOException {
-    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
+                                    Map<String, String> confProperties,
+                                    ParquetReaderConfig readerConfig) throws IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, readerConfig, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public HiveDrillNativeParquetScan(String userName,
@@ -95,8 +97,9 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
                                     Map<String, String> confProperties,
+                                    ParquetReaderConfig readerConfig,
                                     LogicalExpression filter) throws IOException {
-    super(userName, columns, new ArrayList<>(), filter);
+    super(userName, columns, new ArrayList<>(), readerConfig, filter);
 
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.hivePartitionHolder = new HivePartitionHolder();
@@ -154,7 +157,8 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       List<String> values = hivePartitionHolder.get(readEntry.getPath());
       subPartitionHolder.add(readEntry.getPath(), values);
     }
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder,
+      confProperties, readerConfig, filter);
   }
 
   @Override
@@ -199,7 +203,6 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
 
   @Override
   protected void initInternal() throws IOException {
-    ParquetFormatConfig formatConfig = new ParquetFormatConfig();
     Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
     for (ReadEntryWithPath entry : entries) {
       Path path = new Path(entry.getPath());
@@ -209,7 +212,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
       FileSystem fs = path.getFileSystem(conf);
       fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
     }
-    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, formatConfig);
+    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, readerConfig);
   }
 
   @Override
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index ea8d5df..1a02eb4 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -21,6 +21,8 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.drill.PlanTestBase;
 import org.apache.drill.categories.HiveStorageTest;
@@ -260,4 +262,43 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase {
     }
   }
 
+  @Test
+  public void testHiveVarcharPushDown() throws Exception {
+    String query = "select int_key from hive.kv_native where var_key = 'var_1'";
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=4"); // Hive creates parquet files using Parquet lib older than 1.10.0
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", property.getValue()});
+
+        testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("int_key")
+          .baselineValues(1)
+          .go();
+      }
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+    }
+  }
+
+  @Test
+  public void testHiveDecimalPushDown() throws Exception {
+    String query = "select int_key from hive.kv_native where dec_key = cast(1.11 as decimal(5, 2))";
+    // Hive generates parquet files using parquet lib older than 1.10.0
+    // thus statistics for decimal is not available
+    testPlanMatchingPatterns(query, new String[]{"HiveDrillNativeParquetScan", "numRowGroups=4"});
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("int_key")
+      .baselineValues(1)
+      .go();
+  }
+
 }
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 65d1700..84fa368 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -552,12 +552,12 @@ public class HiveTestDataGenerator {
 
   private void createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) {
     // Hive managed table that has data qualified for Drill native filter push down
-    executeQuery(hiveDriver, "create table kv_native(key int, sub_key int) stored as parquet");
+    executeQuery(hiveDriver, "create table kv_native(key int, int_key int, var_key varchar(10), dec_key decimal(5, 2)) stored as parquet");
     // each insert is created in separate file
-    executeQuery(hiveDriver, "insert into table kv_native values (1, 1), (1, 2)");
-    executeQuery(hiveDriver, "insert into table kv_native values (1, 3), (1, 4)");
-    executeQuery(hiveDriver, "insert into table kv_native values (2, 5), (2, 6)");
-    executeQuery(hiveDriver, "insert into table kv_native values (null, 9), (null, 10)");
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 1, 'var_1', 1.11), (1, 2, 'var_2', 2.22)");
+    executeQuery(hiveDriver, "insert into table kv_native values (1, 3, 'var_3', 3.33), (1, 4, 'var_4', 4.44)");
+    executeQuery(hiveDriver, "insert into table kv_native values (2, 5, 'var_5', 5.55), (2, 6, 'var_6', 6.66)");
+    executeQuery(hiveDriver, "insert into table kv_native values (null, 7, 'var_7', 7.77), (null, 8, 'var_8', 8.88)");
 
     // Hive external table which has three partitions
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index e7f0cd2..c65ce2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -335,6 +335,14 @@ public final class ExecConstants {
   public static final OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
       new OptionDescription("Enables Drill to implicitly interpret the INT96 timestamp data type in Parquet files."));
 
+  public static final String PARQUET_READER_STRINGS_SIGNED_MIN_MAX = "store.parquet.reader.strings_signed_min_max";
+  public static final StringValidator PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR = new EnumeratedStringValidator(PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
+    new OptionDescription("Allows binary statistics usage for files created prior to 1.9.1 parquet library version where " +
+      "statistics was incorrectly calculated for UTF-8 data. For cases when user exactly knows " +
+      "that data in binary columns is in ASCII (not UTF-8), turning this property to 'true' " +
+      "enables statistics usage for varchar and decimal data types. Default is unset, i.e. empty string. " +
+      "Allowed values: 'true', 'false', '' (empty string)."), "true", "false", "");
+
   public static final String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
   public static final OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC,
       new OptionDescription("Enable the asynchronous page reader. This pipelines the reading of data from disk for high performance."));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 531cbab..17bf851 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -20,9 +20,13 @@ package org.apache.drill.exec.expr.stat;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.LogicalExpressionBase;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.parquet.column.statistics.Statistics;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -100,10 +104,46 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     if (!leftStat.hasNonNullValue() || !rightStat.hasNonNullValue()) {
       return RowsMatch.SOME;
     }
+
+    if (left.getMajorType().getMinorType() == TypeProtos.MinorType.VARDECIMAL) {
+      /*
+        to compare correctly two decimal statistics we need to ensure that min and max values have the same scale,
+        otherwise adjust statistics to the highest scale
+        since decimal value is stored as unscaled we need to move dot to the right on the difference between scales
+       */
+      int leftScale = left.getMajorType().getScale();
+      int rightScale = right.getMajorType().getScale();
+      if (leftScale > rightScale) {
+        rightStat = adjustDecimalStatistics(rightStat, leftScale - rightScale);
+      } else if (leftScale < rightScale) {
+        leftStat = adjustDecimalStatistics(leftStat, rightScale - leftScale);
+      }
+    }
+
     return predicate.apply(leftStat, rightStat);
   }
 
   /**
+   * Creates decimal statistics where min and max values are re-created using given scale.
+   *
+   * @param statistics statistics that needs to be adjusted
+   * @param scale adjustment scale
+   * @return adjusted statistics
+   */
+  @SuppressWarnings("unchecked")
+  private Statistics<C> adjustDecimalStatistics(Statistics<C> statistics, int scale) {
+    byte[] minBytes = new BigDecimal(new BigInteger(statistics.getMinBytes()))
+      .setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
+    byte[] maxBytes = new BigDecimal(new BigInteger(statistics.getMaxBytes()))
+      .setScale(scale, RoundingMode.HALF_UP).unscaledValue().toByteArray();
+    return (Statistics<C>) Statistics.getBuilderForReading(statistics.type())
+        .withMin(minBytes)
+        .withMax(maxBytes)
+        .withNumNulls(statistics.getNumNulls())
+        .build();
+  }
+
+  /**
    * If one rowgroup contains some null values, change the RowsMatch.ALL into RowsMatch.SOME (null values should be discarded by filter)
    */
   private static RowsMatch checkNull(Statistics leftStat, Statistics rightStat) {
@@ -117,9 +157,22 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
       LogicalExpression left,
       LogicalExpression right
   ) {
-    return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) ->
-      leftStat.compareMaxToValue(rightStat.genericGetMin()) < 0 || rightStat.compareMaxToValue(leftStat.genericGetMin()) < 0 ? RowsMatch.NONE : RowsMatch.SOME
-    ) {
+    return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
+      // compare left max and right min
+      int leftToRightComparison = leftStat.compareMaxToValue(rightStat.genericGetMin());
+      // compare right max and left min
+      int rightToLeftComparison = rightStat.compareMaxToValue(leftStat.genericGetMin());
+
+      // if both comparison results are equal to 0 and both statistics have no nulls,
+      // it means that min and max values in each statistics are the same and match each other,
+      // return that all rows match the condition
+      if (leftToRightComparison == 0 && rightToLeftComparison == 0 && hasNoNulls(leftStat) && hasNoNulls(rightStat)) {
+        return RowsMatch.ALL;
+      }
+
+      // if at least one comparison result is negative, it means that none of the rows match the condition
+      return leftToRightComparison < 0 || rightToLeftComparison < 0 ? RowsMatch.NONE : RowsMatch.SOME;
+    }) {
       @Override
       public String toString() {
         return left + " = " + right;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
index 48a7a90..aa7af42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/RangeExprEvaluator.java
@@ -39,24 +39,28 @@ import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.store.parquet.stat.ColumnStatistics;
 import org.apache.drill.exec.vector.ValueHolderHelper;
+import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.DoubleStatistics;
 import org.apache.parquet.column.statistics.FloatStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.DecimalMetadata;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVisitor<Statistics<T>, Void, RuntimeException> {
-  static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class);
+  private static final Logger logger = LoggerFactory.getLogger(RangeExprEvaluator.class);
 
   private final Map<SchemaPath, ColumnStatistics<T>> columnStatMap;
   private final long rowCount;
@@ -135,6 +139,18 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   }
 
   @Override
+  public Statistics<T> visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, Void value) throws RuntimeException {
+    String stringValue = quotedString.getString();
+    return getStatistics(stringValue);
+  }
+
+  @Override
+  public Statistics<T> visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) throws RuntimeException {
+    DecimalMetadata decimalMeta = new DecimalMetadata(decExpr.getMajorType().getPrecision(), decExpr.getMajorType().getScale());
+    return getStatistics(decExpr.getBigDecimal(), decimalMeta);
+  }
+
+  @Override
   public Statistics<T> visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) throws RuntimeException {
     FuncHolder funcHolder = holderExpr.getHolder();
 
@@ -161,7 +177,7 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(int min, int max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.INT32);
-    ((IntStatistics)statistics).setMinMax(min, max);
+    ((IntStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -172,7 +188,7 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(boolean min, boolean max) {
     Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.BOOLEAN);
-    ((BooleanStatistics)statistics).setMinMax(min, max);
+    ((BooleanStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -183,7 +199,7 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(long min, long max) {
     final Statistics statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.INT64);
-    ((LongStatistics)statistics).setMinMax(min, max);
+    ((LongStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -194,7 +210,7 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(double min, double max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.DOUBLE);
-    ((DoubleStatistics)statistics).setMinMax(min, max);
+    ((DoubleStatistics) statistics).setMinMax(min, max);
     return statistics;
   }
 
@@ -205,10 +221,40 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   @SuppressWarnings("unchecked")
   private Statistics<T> getStatistics(float min, float max) {
     final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.FLOAT);
-    ((FloatStatistics)statistics).setMinMax(min, max);
+    ((FloatStatistics) statistics).setMinMax(min, max);
+    return statistics;
+  }
+
+  private Statistics<T> getStatistics(String value) {
+    return getStatistics(value, value);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Statistics<T> getStatistics(String min, String max) {
+    final Statistics<T> statistics = Statistics.getStatsBasedOnType(PrimitiveType.PrimitiveTypeName.BINARY);
+    ((BinaryStatistics) statistics).setMinMaxFromBytes(min.getBytes(), max.getBytes());
     return statistics;
   }
 
+  private Statistics<T> getStatistics(BigDecimal value, DecimalMetadata decimalMetadata) {
+    return getStatistics(value, value, decimalMetadata);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Statistics<T> getStatistics(BigDecimal min, BigDecimal max, DecimalMetadata decimalMetadata) {
+    PrimitiveType decimalType = org.apache.parquet.schema.Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+      .as(OriginalType.DECIMAL)
+      .precision(decimalMetadata.getPrecision())
+      .scale(decimalMetadata.getScale())
+      .named("decimal_type");
+
+    return (Statistics<T>) Statistics.getBuilderForReading(decimalType)
+        .withMin(min.unscaledValue().toByteArray())
+        .withMax(max.unscaledValue().toByteArray())
+        .withNumNulls(0)
+        .build();
+  }
+
   private Statistics<T> evalCastFunc(FunctionHolderExpression holderExpr, Statistics input) {
     try {
       DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
@@ -288,31 +334,31 @@ public class RangeExprEvaluator<T extends Comparable<T>> extends AbstractExprVis
   private static final Map<TypeProtos.MinorType, Set<TypeProtos.MinorType>> CAST_FUNC = new HashMap<>();
   static {
     // float -> double , int, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.FLOAT4, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.FLOAT4, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.FLOAT8);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT4).add(TypeProtos.MinorType.BIGINT);
 
     // double -> float, int, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.FLOAT8, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.FLOAT8, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.FLOAT8).add(TypeProtos.MinorType.BIGINT);
 
     // int -> float, double, bigint
-    CAST_FUNC.put(TypeProtos.MinorType.INT, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.INT, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.FLOAT8);
     CAST_FUNC.get(TypeProtos.MinorType.INT).add(TypeProtos.MinorType.BIGINT);
 
     // bigint -> int, float, double
-    CAST_FUNC.put(TypeProtos.MinorType.BIGINT, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.BIGINT, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.INT);
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.FLOAT4);
     CAST_FUNC.get(TypeProtos.MinorType.BIGINT).add(TypeProtos.MinorType.FLOAT8);
 
     // date -> timestamp
-    CAST_FUNC.put(TypeProtos.MinorType.DATE, new HashSet<TypeProtos.MinorType>());
+    CAST_FUNC.put(TypeProtos.MinorType.DATE, new HashSet<>());
     CAST_FUNC.get(TypeProtos.MinorType.DATE).add(TypeProtos.MinorType.TIMESTAMP);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index f463e6d..4684251 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -19,13 +19,9 @@ package org.apache.drill.exec.planner.sql.handlers;
 
 import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
 
-import java.io.IOException;
-
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.logical.DrillTable;
@@ -37,6 +33,7 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -58,7 +55,7 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
   }
 
   @Override
-  public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
+  public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
     final SqlRefreshMetadata refreshTable = unwrap(sqlNode, SqlRefreshMetadata.class);
 
     try {
@@ -119,7 +116,12 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
       if (!(formatConfig instanceof ParquetFormatConfig)) {
         formatConfig = new ParquetFormatConfig();
       }
-      Metadata.createMeta(fs, selectionRoot, (ParquetFormatConfig) formatConfig);
+
+      ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+        .withFormatConfig((ParquetFormatConfig) formatConfig)
+        .withOptions(context.getOptions())
+        .build();
+      Metadata.createMeta(fs, selectionRoot, readerConfig);
       return direct(true, "Successfully updated metadata for table %s.", tableName);
 
     } catch(Exception e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1d0bca0..097b231 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -168,6 +168,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
+      new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index f053986..dd1c91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -38,6 +38,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
@@ -163,11 +164,20 @@ public class FileSystemPlugin extends AbstractStoragePlugin {
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns)
-      throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException {
     FormatSelection formatSelection = selection.getWith(lpPersistance, FormatSelection.class);
     FormatPlugin plugin = getFormatPlugin(formatSelection.getFormat());
-    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns);
+    return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index ed3b602..27a72e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,28 +37,32 @@ import org.apache.hadoop.conf.Configuration;
  */
 public interface FormatPlugin {
 
-  public boolean supportsRead();
+  boolean supportsRead();
 
-  public boolean supportsWrite();
+  boolean supportsWrite();
 
   /**
    * Indicates whether this FormatPlugin supports auto-partitioning for CTAS statements
    * @return true if auto-partitioning is supported
    */
-  public boolean supportsAutoPartitioning();
+  boolean supportsAutoPartitioning();
 
-  public FormatMatcher getMatcher();
+  FormatMatcher getMatcher();
 
-  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
+  AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
 
-  public Set<StoragePluginOptimizerRule> getOptimizerRules();
+  Set<StoragePluginOptimizerRule> getOptimizerRules();
 
-  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
+  AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException;
 
-  public FormatPluginConfig getConfig();
-  public StoragePluginConfig getStorageConfig();
-  public Configuration getFsConf();
-  public DrillbitContext getContext();
-  public String getName();
+  default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+    return getGroupScan(userName, selection, columns);
+  }
+
+  FormatPluginConfig getConfig();
+  StoragePluginConfig getStorageConfig();
+  Configuration getFsConf();
+  DrillbitContext getContext();
+  String getName();
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 1b96e58..9bc969f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
@@ -80,14 +81,20 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   protected List<RowGroupInfo> rowGroupInfos;
   protected ListMultimap<Integer, RowGroupInfo> mappings;
   protected Set<String> fileSet;
+  protected ParquetReaderConfig readerConfig;
 
   private List<EndpointAffinity> endpointAffinities;
   private ParquetGroupScanStatistics parquetGroupScanStatistics;
 
-  protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, List<ReadEntryWithPath> entries, LogicalExpression filter) {
+  protected AbstractParquetGroupScan(String userName,
+                                     List<SchemaPath> columns,
+                                     List<ReadEntryWithPath> entries,
+                                     ParquetReaderConfig readerConfig,
+                                     LogicalExpression filter) {
     super(userName);
     this.columns = columns;
     this.entries = entries;
+    this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
   }
 
@@ -103,6 +110,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
     this.parquetGroupScanStatistics = that.parquetGroupScanStatistics == null ? null : new ParquetGroupScanStatistics(that.parquetGroupScanStatistics);
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
     this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
+    this.readerConfig = that.readerConfig;
   }
 
   @JsonProperty
@@ -115,6 +123,18 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
     return entries;
   }
 
+  @JsonProperty("readerConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  // do not serialize reader config if it contains all default values
+  public ParquetReaderConfig getReaderConfigForSerialization() {
+    return ParquetReaderConfig.getDefaultInstance().equals(readerConfig) ? null : readerConfig;
+  }
+
+  @JsonIgnore
+  public ParquetReaderConfig getReaderConfig() {
+    return readerConfig;
+  }
+
   @JsonIgnore
   @Override
   public Collection<String> getFiles() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 8726b9d..52b2baa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -37,15 +38,18 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
 
   protected final List<RowGroupReadEntry> rowGroupReadEntries;
   protected final List<SchemaPath> columns;
+  protected final ParquetReaderConfig readerConfig;
   protected final LogicalExpression filter;
 
   protected AbstractParquetRowGroupScan(String userName,
                                      List<RowGroupReadEntry> rowGroupReadEntries,
                                      List<SchemaPath> columns,
+                                     ParquetReaderConfig readerConfig,
                                      LogicalExpression filter) {
     super(userName);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
+    this.readerConfig = readerConfig == null ? ParquetReaderConfig.getDefaultInstance() : readerConfig;
     this.filter = filter;
   }
 
@@ -59,6 +63,18 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
     return columns;
   }
 
+  @JsonProperty("readerConfig")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  // do not serialize reader config if it contains all default values
+  public ParquetReaderConfig getReaderConfigForSerialization() {
+    return ParquetReaderConfig.getDefaultInstance().equals(readerConfig) ? null : readerConfig;
+  }
+
+  @JsonIgnore
+  public ParquetReaderConfig getReaderConfig() {
+    return readerConfig;
+  }
+
   @JsonProperty
   public LogicalExpression getFilter() {
     return filter;
@@ -80,7 +96,6 @@ public abstract class AbstractParquetRowGroupScan extends AbstractBase implement
   }
 
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
-  public abstract boolean areCorruptDatesAutoCorrected();
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
   public abstract boolean supportsFileImplicitColumns();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index b7f9cdb..99161fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -30,12 +30,10 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -50,17 +48,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-
 public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
 
-  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
-  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
-  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
-
   protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
     final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
 
@@ -87,12 +78,13 @@ public abstract class AbstractParquetScanBatchCreator {
       try {
         Stopwatch timer = logger.isTraceEnabled() ? Stopwatch.createUnstarted() : null;
         DrillFileSystem fs = fsManager.get(rowGroupScan.getFsConf(rowGroup), rowGroup.getPath());
+        ParquetReaderConfig readerConfig = rowGroupScan.getReaderConfig();
         if (!footers.containsKey(rowGroup.getPath())) {
           if (timer != null) {
             timer.start();
           }
 
-          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath());
+          ParquetMetadata footer = readFooter(fs.getConf(), rowGroup.getPath(), readerConfig);
           if (timer != null) {
             long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
             logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", rowGroup.getPath(), "", 0, 0, 0, timeToRead);
@@ -101,10 +93,9 @@ public abstract class AbstractParquetScanBatchCreator {
         }
         ParquetMetadata footer = footers.get(rowGroup.getPath());
 
-        boolean autoCorrectCorruptDates = rowGroupScan.areCorruptDatesAutoCorrected();
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
-          ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
-        logger.debug("Contains corrupt dates: {}", containsCorruptDates);
+        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footer,
+          rowGroupScan.getColumns(), readerConfig.autoCorrectCorruptedDates());
+        logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
 
         boolean useNewReader = context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER);
         boolean containsComplexColumn = ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns());
@@ -149,7 +140,7 @@ public abstract class AbstractParquetScanBatchCreator {
     }
 
     // all readers should have the same number of implicit columns, add missing ones with value null
-    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant(null));
     for (Map<String, String> map : implicitColumns) {
       map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
     }
@@ -159,14 +150,9 @@ public abstract class AbstractParquetScanBatchCreator {
 
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
-  private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
-    conf = new Configuration(conf);
-    conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
-    conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
-    conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
-    conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
-    ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
-    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+  private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException {
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path),
+      readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) {
       return reader.getFooter();
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
index ad38849..f0f1029 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -145,6 +147,11 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
   }
 
   @Override
+  public LogicalExpression visitQuotedStringConstant(ValueExpressions.QuotedString quotedString, Set<LogicalExpression> value) throws RuntimeException {
+    return quotedString;
+  }
+
+  @Override
   public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExpression> value) {
     List<LogicalExpression> childPredicates = new ArrayList<>();
     String functionName = op.getName();
@@ -176,31 +183,35 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
 
   private LogicalExpression getValueExpressionFromConst(ValueHolder holder, TypeProtos.MinorType type) {
     switch (type) {
-    case INT:
-      return ValueExpressions.getInt(((IntHolder) holder).value);
-    case BIGINT:
-      return ValueExpressions.getBigInt(((BigIntHolder) holder).value);
-    case FLOAT4:
-      return ValueExpressions.getFloat4(((Float4Holder) holder).value);
-    case FLOAT8:
-      return ValueExpressions.getFloat8(((Float8Holder) holder).value);
-    case VARDECIMAL:
-      VarDecimalHolder decimalHolder = (VarDecimalHolder) holder;
-      return ValueExpressions.getVarDecimal(
-          DecimalUtility.getBigDecimalFromDrillBuf(decimalHolder.buffer,
-              decimalHolder.start, decimalHolder.end - decimalHolder.start, decimalHolder.scale),
-          decimalHolder.precision,
-          decimalHolder.scale);
-    case DATE:
-      return ValueExpressions.getDate(((DateHolder) holder).value);
-    case TIMESTAMP:
-      return ValueExpressions.getTimeStamp(((TimeStampHolder) holder).value);
-    case TIME:
-      return ValueExpressions.getTime(((TimeHolder) holder).value);
-    case BIT:
-      return ValueExpressions.getBit(((BitHolder) holder).value == 1);
-    default:
-      return null;
+      case INT:
+        return ValueExpressions.getInt(((IntHolder) holder).value);
+      case BIGINT:
+        return ValueExpressions.getBigInt(((BigIntHolder) holder).value);
+      case FLOAT4:
+        return ValueExpressions.getFloat4(((Float4Holder) holder).value);
+      case FLOAT8:
+        return ValueExpressions.getFloat8(((Float8Holder) holder).value);
+      case VARDECIMAL:
+        VarDecimalHolder decimalHolder = (VarDecimalHolder) holder;
+        return ValueExpressions.getVarDecimal(
+            DecimalUtility.getBigDecimalFromDrillBuf(decimalHolder.buffer,
+                decimalHolder.start, decimalHolder.end - decimalHolder.start, decimalHolder.scale),
+            decimalHolder.precision,
+            decimalHolder.scale);
+      case DATE:
+        return ValueExpressions.getDate(((DateHolder) holder).value);
+      case TIMESTAMP:
+        return ValueExpressions.getTimeStamp(((TimeStampHolder) holder).value);
+      case TIME:
+        return ValueExpressions.getTime(((TimeHolder) holder).value);
+      case BIT:
+        return ValueExpressions.getBit(((BitHolder) holder).value == 1);
+      case VARCHAR:
+        VarCharHolder varCharHolder = (VarCharHolder) holder;
+        String value = StringFunctionHelpers.toStringFromUTF8(varCharHolder.start, varCharHolder.end, varCharHolder.buffer);
+        return ValueExpressions.getChar(value, value.length());
+      default:
+        return null;
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 9d4c453..2ff8415 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class ParquetFormatConfig implements FormatPluginConfig {
 
   public boolean autoCorrectCorruptDates = true;
+  public boolean enableStringsSignedMinMax = false;
 
   /**
    * @return true if auto correction of corrupt dates is enabled, false otherwise
@@ -36,6 +37,21 @@ public class ParquetFormatConfig implements FormatPluginConfig {
     return autoCorrectCorruptDates;
   }
 
+  /**
+   * Parquet statistics for UTF-8 data for files created prior to 1.9.1 parquet library version was stored incorrectly.
+   * If user exactly knows that data in binary columns is in ASCII (not UTF-8), turning this property to 'true'
+   * enables statistics usage for varchar and decimal columns.
+   *
+   * Can be overridden for individual tables using
+   * @link org.apache.drill.exec.ExecConstants#PARQUET_READER_STRINGS_SIGNED_MIN_MAX} session option.
+   *
+   * @return true if string signed min max enabled, false otherwise
+   */
+  @JsonIgnore
+  public boolean isStringsSignedMinMaxEnabled() {
+    return enableStringsSignedMinMax;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -47,12 +63,25 @@ public class ParquetFormatConfig implements FormatPluginConfig {
 
     ParquetFormatConfig that = (ParquetFormatConfig) o;
 
-    return autoCorrectCorruptDates == that.autoCorrectCorruptDates;
+    if (autoCorrectCorruptDates != that.autoCorrectCorruptDates) {
+      return false;
+    }
 
+    return enableStringsSignedMinMax == that.enableStringsSignedMinMax;
   }
 
   @Override
   public int hashCode() {
-    return (autoCorrectCorruptDates ? 1231 : 1237);
+    int result = (autoCorrectCorruptDates ? 1231 : 1237);
+    result = 31 * result + (enableStringsSignedMinMax ? 1231 : 1237);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "ParquetFormatConfig{"
+      + "autoCorrectCorruptDates=" + autoCorrectCorruptDates
+      + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+      + '}';
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 9d828d2..2c40996 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -67,7 +68,7 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
-public class ParquetFormatPlugin implements FormatPlugin{
+public class ParquetFormatPlugin implements FormatPlugin {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
   public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
@@ -175,9 +176,17 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
-  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
-      throws IOException {
-    ParquetGroupScan parquetGroupScan = new ParquetGroupScan(userName, selection, this, columns);
+  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return getGroupScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+      .withFormatConfig(getConfig())
+      .withOptions(options)
+      .build();
+    ParquetGroupScan parquetGroupScan = new ParquetGroupScan(userName, selection, this, columns, readerConfig);
     if (parquetGroupScan.getEntries().isEmpty()) {
       // If ParquetGroupScan does not contain any entries, it means selection directories are empty and
       // metadata cache files are invalid, return schemaless scan
@@ -211,7 +220,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
     return formatMatcher;
   }
 
-  private static class ParquetFormatMatcher extends BasicFormatMatcher{
+  private static class ParquetFormatMatcher extends BasicFormatMatcher {
 
     private final ParquetFormatConfig formatConfig;
 
@@ -229,7 +238,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
     public DrillTable isReadable(DrillFileSystem fs, FileSelection selection,
         FileSystemPlugin fsPlugin, String storageEngineName, SchemaConfig schemaConfig)
         throws IOException {
-      if(selection.containsDirectories(fs)) {
+      if (selection.containsDirectories(fs)) {
         Path dirMetaPath = new Path(selection.getSelectionRoot(), Metadata.METADATA_DIRECTORIES_FILENAME);
         // check if the metadata 'directories' file exists; if it does, there is an implicit assumption that
         // the directory is readable since the metadata 'directories' file cannot be created otherwise.  Note
@@ -238,7 +247,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
           // create a metadata context that will be used for the duration of the query for this table
           MetadataContext metaContext = new MetadataContext();
 
-          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, formatConfig);
+          ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, readerConfig);
           if (mDirs != null && mDirs.getDirectories().size() > 0) {
             FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
                 selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
@@ -249,7 +259,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
                 new FormatSelection(plugin.getConfig(), dirSelection));
           }
         }
-        if(isDirReadable(fs, selection.getFirstPath(fs))) {
+        if (isDirReadable(fs, selection.getFirstPath(fs))) {
           return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
               new FormatSelection(plugin.getConfig(), selection));
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 7fe665d..a1d9f51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -80,8 +80,9 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
                           @JsonProperty("columns") List<SchemaPath> columns,
                           @JsonProperty("selectionRoot") String selectionRoot,
                           @JsonProperty("cacheFileRoot") String cacheFileRoot,
+                          @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                           @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
-    super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
     Preconditions.checkNotNull(storageConfig);
     Preconditions.checkNotNull(formatConfig);
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -98,16 +99,18 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
   public ParquetGroupScan(String userName,
                           FileSelection selection,
                           ParquetFormatPlugin formatPlugin,
-                          List<SchemaPath> columns) throws IOException {
-    this(userName, selection, formatPlugin, columns, ValueExpressions.BooleanExpression.TRUE);
+                          List<SchemaPath> columns,
+                          ParquetReaderConfig readerConfig) throws IOException {
+    this(userName, selection, formatPlugin, columns, readerConfig, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public ParquetGroupScan(String userName,
                           FileSelection selection,
                           ParquetFormatPlugin formatPlugin,
                           List<SchemaPath> columns,
+                          ParquetReaderConfig readerConfig,
                           LogicalExpression filter) throws IOException {
-    super(userName, columns, new ArrayList<>(), filter);
+    super(userName, columns, new ArrayList<>(), readerConfig, filter);
 
     this.formatPlugin = formatPlugin;
     this.formatConfig = formatPlugin.getConfig();
@@ -178,7 +181,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
@@ -245,13 +248,13 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
         metaPath = new Path(p, Metadata.METADATA_FILENAME);
       }
       if (!metaContext.isMetadataCacheCorrupted() && metaPath != null && fs.exists(metaPath)) {
-        parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+        parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
         if (parquetTableMetadata != null) {
           usedMetadataCache = true;
         }
       }
       if (!usedMetadataCache) {
-        parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), formatConfig);
+        parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig);
       }
     } else {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
@@ -259,7 +262,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
       if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot))
           && fs.exists(metaPath)) {
         if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+          parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
         }
         if (parquetTableMetadata != null) {
           usedMetadataCache = true;
@@ -283,7 +286,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
                     (oldFs, newFs) -> newFs,
                     LinkedHashMap::new));
 
-        parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, formatConfig);
+        parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, readerConfig);
       }
     }
   }
@@ -362,7 +365,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
    * @param metaFilePath metadata cache file path
    * @return file selection read from cache
    *
-   * @throws UserException when the updated selection is empty, this happens if the user selects an empty folder.
+   * @throws org.apache.drill.common.exceptions.UserException when the updated selection is empty,
+   * this happens if the user selects an empty folder.
    */
   private FileSelection expandSelectionFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException {
     // get the metadata for the root directory by reading the metadata file
@@ -371,14 +375,14 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
 
     // get (and set internal field) the metadata for the directory by reading the metadata file
     FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
-    parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePath, metaContext, formatConfig);
+    parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePath, metaContext, readerConfig);
     if (ignoreExpandingSelection(parquetTableMetadata)) {
       return selection;
     }
     if (formatConfig.areCorruptDatesAutoCorrected()) {
       ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
     }
-    ParquetReaderUtility.correctBinaryInMetadataCache(parquetTableMetadata);
+    ParquetReaderUtility.transformBinaryInMetadataCache(parquetTableMetadata, readerConfig);
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -412,7 +416,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
         if (status.isDirectory()) {
           //TODO [DRILL-4496] read the metadata cache files in parallel
           final Path metaPath = new Path(cacheFileRoot, Metadata.METADATA_FILENAME);
-          final ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig);
+          final ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
           if (ignoreExpandingSelection(metadata)) {
             return selection;
           }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 6efd44d..c59cdce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -18,8 +18,6 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -45,6 +43,7 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -137,22 +136,22 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     // then we could not pushed down. Otherwise, it's qualified to be pushed down.
     final List<RexNode> predList = RelOptUtil.conjunctions(condition);
 
-    final List<RexNode> qualifiedPredList = Lists.newArrayList();
+    final List<RexNode> qualifiedPredList = new ArrayList<>();
 
     for (final RexNode pred : predList) {
-      if (DrillRelOptUtil.findOperators(pred, ImmutableList.of(), BANNED_OPERATORS) == null) {
+      if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
         qualifiedPredList.add(pred);
       }
     }
 
-    final RexNode qualifedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
+    final RexNode qualifiedPred = RexUtil.composeConjunction(filter.getCluster().getRexBuilder(), qualifiedPredList, true);
 
-    if (qualifedPred == null) {
+    if (qualifiedPred == null) {
       return;
     }
 
     LogicalExpression conditionExp = DrillOptiq.toDrill(
-        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifedPred);
+        new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifiedPred);
 
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
@@ -170,7 +169,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
     RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
 
     if (project != null) {
-      newScan = project.copy(project.getTraitSet(), ImmutableList.of(newScan));
+      newScan = project.copy(project.getTraitSet(), Collections.singletonList(newScan));
     }
 
     if (newGroupScan instanceof AbstractParquetGroupScan) {
@@ -184,10 +183,11 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
       }
       if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
         call.transformTo(newScan);
+        return;
       }
     }
 
-    final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.<RelNode>of(newScan));
+    final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newScan));
     call.transformTo(newFilter);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
new file mode 100644
index 0000000..3b668c7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+
+import java.util.Objects;
+
+/**
+ * Stores consolidated parquet reading configuration. Can obtain config values from various sources:
+ * Assignment priority of configuration values is the following:
+ * <li>parquet format config</li>
+ * <li>Hadoop configuration</li>
+ * <li>session options</li>
+ *
+ * During serialization does not deserialize the default values in keep serialized object smaller.
+ */
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
+public class ParquetReaderConfig {
+
+  public static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
+  public static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
+  public static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
+
+  private static final ParquetReaderConfig DEFAULT_INSTANCE = new ParquetReaderConfig();
+
+  private boolean enableBytesReadCounter = false;
+  private boolean enableBytesTotalCounter = false;
+  private boolean enableTimeReadCounter = false;
+  private boolean autoCorrectCorruptedDates = true;
+  private boolean enableStringsSignedMinMax = false;
+
+  public static ParquetReaderConfig.Builder builder() {
+    return new ParquetReaderConfig.Builder();
+  }
+
+  public static ParquetReaderConfig getDefaultInstance() {
+    return DEFAULT_INSTANCE;
+  }
+
+  @JsonCreator
+  public ParquetReaderConfig(@JsonProperty("enableBytesReadCounter") Boolean enableBytesReadCounter,
+                             @JsonProperty("enableBytesTotalCounter") Boolean enableBytesTotalCounter,
+                             @JsonProperty("enableTimeReadCounter") Boolean enableTimeReadCounter,
+                             @JsonProperty("autoCorrectCorruptedDates") Boolean autoCorrectCorruptedDates,
+                             @JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax) {
+    this.enableBytesReadCounter = enableBytesReadCounter == null ? this.enableBytesReadCounter : enableBytesReadCounter;
+    this.enableBytesTotalCounter = enableBytesTotalCounter == null ? this.enableBytesTotalCounter : enableBytesTotalCounter;
+    this.enableTimeReadCounter = enableTimeReadCounter == null ? this.enableTimeReadCounter : enableTimeReadCounter;
+    this.autoCorrectCorruptedDates = autoCorrectCorruptedDates == null ? this.autoCorrectCorruptedDates : autoCorrectCorruptedDates;
+    this.enableStringsSignedMinMax = enableStringsSignedMinMax == null ? this.enableStringsSignedMinMax : enableStringsSignedMinMax;
+  }
+
+  private ParquetReaderConfig() { }
+
+  @JsonProperty("enableBytesReadCounter")
+  public boolean enableBytesReadCounter() {
+    return enableBytesReadCounter;
+  }
+
+  @JsonProperty("enableBytesTotalCounter")
+  public boolean enableBytesTotalCounter() {
+    return enableBytesTotalCounter;
+  }
+
+  @JsonProperty("enableTimeReadCounter")
+  public boolean enableTimeReadCounter() {
+    return enableTimeReadCounter;
+  }
+
+  @JsonProperty("autoCorrectCorruptedDates")
+  public boolean autoCorrectCorruptedDates() {
+    return autoCorrectCorruptedDates;
+  }
+
+  @JsonProperty("enableStringsSignedMinMax")
+  public boolean enableStringsSignedMinMax() {
+    return enableStringsSignedMinMax;
+  }
+
+  public ParquetReadOptions toReadOptions() {
+    return ParquetReadOptions.builder()
+      .useSignedStringMinMax(enableStringsSignedMinMax)
+      .build();
+  }
+
+  public Configuration addCountersToConf(Configuration conf) {
+    Configuration newConfig = new Configuration(conf);
+    newConfig.setBoolean(ENABLE_BYTES_READ_COUNTER, enableBytesReadCounter);
+    newConfig.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, enableBytesTotalCounter);
+    newConfig.setBoolean(ENABLE_TIME_READ_COUNTER, enableTimeReadCounter);
+    return newConfig;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(enableBytesReadCounter,
+      enableBytesTotalCounter,
+      enableTimeReadCounter,
+      autoCorrectCorruptedDates,
+      enableStringsSignedMinMax);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ParquetReaderConfig that = (ParquetReaderConfig) o;
+    return enableBytesReadCounter == that.enableBytesReadCounter
+      && enableBytesTotalCounter == that.enableBytesTotalCounter
+      && enableTimeReadCounter == that.enableTimeReadCounter
+      && autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
+      && enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+  }
+
+  @Override
+  public String toString() {
+    return "ParquetReaderConfig{"
+      + "enableBytesReadCounter=" + enableBytesReadCounter
+      + ", enableBytesTotalCounter=" + enableBytesTotalCounter
+      + ", enableTimeReadCounter=" + enableTimeReadCounter
+      + ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+      + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+      + '}';
+  }
+
+  public static class Builder {
+
+    private ParquetFormatConfig formatConfig;
+    private Configuration conf;
+    private OptionManager options;
+
+    public Builder withFormatConfig(ParquetFormatConfig formatConfig) {
+      this.formatConfig = formatConfig;
+      return this;
+    }
+
+    public Builder withConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder withOptions(OptionManager options) {
+      this.options = options;
+      return this;
+    }
+
+    public ParquetReaderConfig build() {
+      ParquetReaderConfig readerConfig = new ParquetReaderConfig();
+
+      // first assign configuration values from format config
+      if (formatConfig != null) {
+        readerConfig.autoCorrectCorruptedDates = formatConfig.areCorruptDatesAutoCorrected();
+        readerConfig.enableStringsSignedMinMax = formatConfig.isStringsSignedMinMaxEnabled();
+      }
+
+      // then assign configuration values from Hadoop configuration
+      if (conf != null) {
+        readerConfig.enableBytesReadCounter = conf.getBoolean(ENABLE_BYTES_READ_COUNTER, readerConfig.enableBytesReadCounter);
+        readerConfig.enableBytesTotalCounter = conf.getBoolean(ENABLE_BYTES_TOTAL_COUNTER, readerConfig.enableBytesTotalCounter);
+        readerConfig.enableTimeReadCounter = conf.getBoolean(ENABLE_TIME_READ_COUNTER, readerConfig.enableTimeReadCounter);
+      }
+
+      // last assign values from session options, session options have higher priority than other configurations
+      if (options != null) {
+        String option = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
+        if (!option.isEmpty()) {
+          readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
+        }
+      }
+
+      return readerConfig;
+    }
+
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 8bfdf18..733915b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -28,6 +27,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
 import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.work.ExecErrorConstants;
+import org.apache.hadoop.util.VersionUtil;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -52,6 +52,7 @@ import org.joda.time.DateTimeZone;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -98,6 +99,9 @@ public class ParquetReaderUtility {
    * Prior versions had dates formatted with {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#CORRECT_CORRUPT_DATE_SHIFT}
    */
   public static final int DRILL_WRITER_VERSION_STD_DATE_FORMAT = 2;
+
+  public static final String ALLOWED_DRILL_VERSION_FOR_BINARY = "1.15.0";
+
   /**
    * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
    * based on the file metadata. For older files that lack statistics we must actually test the values
@@ -275,36 +279,39 @@ public class ParquetReaderUtility {
   }
 
   /**
-   * Checks assigns byte arrays to min/max values obtained from the deserialized string
-   * for BINARY.
+   *
+   * Transforms values for min / max binary statistics to byte array.
+   * Transformation logic depends on metadata file version.
    *
    * @param parquetTableMetadata table metadata that should be corrected
+   * @param readerConfig parquet reader config
    */
-  public static void correctBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) {
+  public static void transformBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata, ParquetReaderConfig readerConfig) {
     // Looking for the names of the columns with BINARY data type
     // in the metadata cache file for V2 and all v3 versions
     Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
+    boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);
 
     for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
       for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
         Long rowCount = rowGroupMetadata.getRowCount();
         for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
-          // Setting Min/Max values for ParquetTableMetadata_v1
+          // Setting Min / Max values for ParquetTableMetadata_v1
           if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
             if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
                 || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-              setMinMaxValues(columnMetadata, rowCount);
+              setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
             }
           }
-          // Setting Min/Max values for V2 and all V3 versions before V3_3
+          // Setting Min / Max values for V2 and all V3 versions prior to V3_3
           else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
                     && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
-            setMinMaxValues(columnMetadata, rowCount);
+            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
           }
-          // Setting Min/Max values for V3_3 and all younger versions
+          // Setting Min / Max values for V3_3 and all next versions
           else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
                       && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
-            convertMinMaxValues(columnMetadata, rowCount);
+            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, true);
           }
         }
       }
@@ -312,6 +319,21 @@ public class ParquetReaderUtility {
   }
 
   /**
+   * If binary metadata was stored prior to Drill version {@link #ALLOWED_DRILL_VERSION_FOR_BINARY},
+   * it might have incorrectly defined min / max values.
+   * In case if given version is null, we assume this version is prior to {@link #ALLOWED_DRILL_VERSION_FOR_BINARY}.
+   * In this case we allow reading such metadata only if {@link ParquetReaderConfig#enableStringsSignedMinMax()} is true.
+   *
+   * @param drillVersion drill version used to create metadata file
+   * @param readerConfig parquet reader configuration
+   * @return true if reading binary min / max values are allowed, false otherwise
+   */
+  private static boolean allowBinaryMetadata(String drillVersion, ParquetReaderConfig readerConfig) {
+    return readerConfig.enableStringsSignedMinMax() ||
+      (drillVersion != null && VersionUtil.compareVersions(ALLOWED_DRILL_VERSION_FOR_BINARY, drillVersion) <= 0);
+  }
+
+  /**
    * Returns the set of the lists with names of the columns with BINARY or
    * FIXED_LEN_BYTE_ARRAY data type from {@code ParquetTableMetadataBase columnTypeMetadataCollection}
    * if parquetTableMetadata has version v2 or v3 (including minor versions).
@@ -320,7 +342,7 @@ public class ParquetReaderUtility {
    * @return set of the lists with column names
    */
   private static Set<List<String>> getBinaryColumnsNames(ParquetTableMetadataBase parquetTableMetadata) {
-    Set<List<String>> names = Sets.newHashSet();
+    Set<List<String>> names = new HashSet<>();
     if (parquetTableMetadata instanceof ParquetTableMetadata_v2) {
       for (ColumnTypeMetadata_v2 columnTypeMetadata :
         ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
@@ -342,39 +364,36 @@ public class ParquetReaderUtility {
   }
 
   /**
-   * Checks that column has single value and replaces Min and Max by their byte values
-   * in {@code Metadata.ColumnMetadata columnMetadata} if their values were stored as strings.
+   * If binary metadata is not allowed (logic is defined in
+   * {@link ParquetReaderUtility#allowBinaryMetadata(String, ParquetReaderConfig)}),
+   * set min max values for binary column, only if min and max values are the same,
+   * otherwise set min and max as nulls.
    *
    * @param columnMetadata column metadata that should be changed
-   * @param rowCount       rows count in column chunk
+   * @param rowCount rows count in column chunk
+   * @param allowBinaryMetadata if reading binary metadata is allowed
+   * @param needsDecoding if min and max values is Base64 data and should be decoded
    */
-  private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount) {
-    if (columnMetadata.hasSingleValue(rowCount)) {
-      Object minValue = columnMetadata.getMinValue();
-      if (minValue != null && minValue instanceof String) {
-        byte[] bytes = ((String) minValue).getBytes();
-        columnMetadata.setMax(bytes);
-        columnMetadata.setMin(bytes);
-      }
-    }
-  }
+  private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount, boolean allowBinaryMetadata, boolean needsDecoding) {
+    byte[] minBytes = null;
+    byte[] maxBytes = null;
 
-  /**
-   * Checks that column has single value and replaces Min and Max by their byte values from Base64 data
-   * in Metadata.ColumnMetadata columnMetadata if their values were stored as strings.
-   *
-   * @param columnMetadata column metadata that should be changed
-   * @param rowCount       rows count in column chunk
-   */
-  private static void convertMinMaxValues(ColumnMetadata columnMetadata, long rowCount) {
-    if (columnMetadata.hasSingleValue(rowCount)) {
+    boolean hasSingleValue = false;
+    if (allowBinaryMetadata || (hasSingleValue = columnMetadata.hasSingleValue(rowCount))) {
       Object minValue = columnMetadata.getMinValue();
-      if (minValue != null && minValue instanceof String) {
-        byte[] bytes = Base64.decodeBase64(((String) minValue).getBytes());
-        columnMetadata.setMax(bytes);
-        columnMetadata.setMin(bytes);
+      Object maxValue = columnMetadata.getMaxValue();
+      if (minValue instanceof String && maxValue instanceof String) {
+        minBytes = ((String) minValue).getBytes();
+        maxBytes = ((String) maxValue).getBytes();
+        if (needsDecoding) {
+          minBytes = Base64.decodeBase64(minBytes);
+          maxBytes = hasSingleValue ? minBytes : Base64.decodeBase64(maxBytes);
+        }
       }
     }
+
+    columnMetadata.setMin(minBytes);
+    columnMetadata.setMax(maxBytes);
   }
 
   /**
@@ -522,7 +541,7 @@ public class ParquetReaderUtility {
    * @return  Timestamp in milliseconds - the number of milliseconds since January 1, 1970, 00:00:00 GMT
    *          represented by @param binaryTimeStampValue.
    *          The nanos precision is cut to millis. Therefore the length of single timestamp value is
-   *          {@value NullableTimeStampHolder#WIDTH} bytes instead of 12 bytes.
+   *          {@value org.apache.drill.exec.expr.holders.NullableTimeStampHolder#WIDTH} bytes instead of 12 bytes.
    */
     public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue, boolean retainLocalTimezone) {
       // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored as sum of
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index cbcca79..eabe2df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -54,12 +54,14 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
                              @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
                              @JsonProperty("columns") List<SchemaPath> columns,
+                             @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
                              @JsonProperty("selectionRoot") String selectionRoot,
                              @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
         columns,
+        readerConfig,
         selectionRoot,
         filter);
   }
@@ -68,9 +70,10 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
                              ParquetFormatPlugin formatPlugin,
                              List<RowGroupReadEntry> rowGroupReadEntries,
                              List<SchemaPath> columns,
+                             ParquetReaderConfig readerConfig,
                              String selectionRoot,
                              LogicalExpression filter) {
-    super(userName, rowGroupReadEntries, columns, filter);
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
     this.selectionRoot = selectionRoot;
@@ -99,7 +102,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter);
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
@@ -109,12 +112,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter);
-  }
-
-  @Override
-  public boolean areCorruptDatesAutoCorrected() {
-    return formatConfig.areCorruptDatesAutoCorrected();
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, readerConfig, selectionRoot, filter);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index b16a4cc..0db007a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -35,7 +36,6 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.store.dfs.MetadataContext;
-import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -64,6 +63,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -88,16 +88,15 @@ public class Metadata {
   public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
   public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
-  public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
 
-  private final ParquetFormatConfig formatConfig;
+  private final ParquetReaderConfig readerConfig;
 
   private ParquetTableMetadataBase parquetTableMetadata;
   private ParquetTableMetadataDirs parquetTableMetadataDirs;
 
 
-  private Metadata(ParquetFormatConfig formatConfig) {
-    this.formatConfig = formatConfig;
+  private Metadata(ParquetReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
   }
 
   /**
@@ -105,9 +104,10 @@ public class Metadata {
    *
    * @param fs file system
    * @param path path
+   * @param readerConfig parquet reader configuration
    */
-  public static void createMeta(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+  public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     metadata.createMetaFilesRecursively(path, fs);
   }
 
@@ -116,11 +116,12 @@ public class Metadata {
    *
    * @param fs file system
    * @param path path
+   * @param readerConfig parquet reader configuration
+   *
    * @return parquet table metadata
    */
-  public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, String path, ParquetFormatConfig formatConfig)
-      throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+  public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     return metadata.getParquetTableMetadata(path, fs);
   }
 
@@ -128,12 +129,12 @@ public class Metadata {
    * Get the parquet metadata for a list of parquet files.
    *
    * @param fileStatusMap file statuses and corresponding file systems
-   * @param formatConfig parquet format config
+   * @param readerConfig parquet reader configuration
    * @return parquet table metadata
    */
   public static ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap,
-                                                                ParquetFormatConfig formatConfig) throws IOException {
-    Metadata metadata = new Metadata(formatConfig);
+                                                                ParquetReaderConfig readerConfig) throws IOException {
+    Metadata metadata = new Metadata(readerConfig);
     return metadata.getParquetTableMetadata(fileStatusMap);
   }
 
@@ -143,15 +144,17 @@ public class Metadata {
    * @param fs current file system
    * @param path The path to the metadata file, located in the directory that contains the parquet files
    * @param metaContext metadata context
-   * @param formatConfig parquet format plugin configs
+   * @param readerConfig parquet reader configuration
    * @return parquet table metadata. Null if metadata cache is missing, unsupported or corrupted
    */
-  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs, Path path, MetadataContext metaContext,
-      ParquetFormatConfig formatConfig) {
+  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs,
+                                                                 Path path,
+                                                                 MetadataContext metaContext,
+                                                                 ParquetReaderConfig readerConfig) {
     if (ignoreReadingMetadata(metaContext, path)) {
       return null;
     }
-    Metadata metadata = new Metadata(formatConfig);
+    Metadata metadata = new Metadata(readerConfig);
     metadata.readBlockMeta(path, false, metaContext, fs);
     return metadata.parquetTableMetadata;
   }
@@ -162,15 +165,17 @@ public class Metadata {
    * @param fs current file system
    * @param path The path to the metadata file, located in the directory that contains the parquet files
    * @param metaContext metadata context
-   * @param formatConfig parquet format plugin configs
+   * @param readerConfig parquet reader configuration
    * @return parquet metadata for a directory. Null if metadata cache is missing, unsupported or corrupted
    */
-  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, Path path,
-      MetadataContext metaContext, ParquetFormatConfig formatConfig) {
+  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs,
+                                                                    Path path,
+                                                                    MetadataContext metaContext,
+                                                                    ParquetReaderConfig readerConfig) {
     if (ignoreReadingMetadata(metaContext, path)) {
       return null;
     }
-    Metadata metadata = new Metadata(formatConfig);
+    Metadata metadata = new Metadata(readerConfig);
     metadata.readBlockMeta(path, true, metaContext, fs);
     return metadata.parquetTableMetadataDirs;
   }
@@ -412,12 +417,9 @@ public class Metadata {
     final ParquetMetadata metadata;
     final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
     final Configuration conf = new Configuration(fs.getConf());
-    final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
-        .useSignedStringMinMax(true)
-        .build();
     try {
       metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
-        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
           return parquetFileReader.getFooter();
         }
       });
@@ -429,8 +431,7 @@ public class Metadata {
 
     MessageType schema = metadata.getFileMetaData().getSchema();
 
-//    Map<SchemaPath, OriginalType> originalTypeMap = Maps.newHashMap();
-    Map<SchemaPath, ColTypeInfo> colTypeInfoMap = Maps.newHashMap();
+    Map<SchemaPath, ColTypeInfo> colTypeInfoMap = new HashMap<>();
     schema.getPaths();
     for (String[] path : schema.getPaths()) {
       colTypeInfoMap.put(SchemaPath.getCompoundPath(path), getColTypeInfo(schema, schema, path, 0));
@@ -440,11 +441,10 @@ public class Metadata {
 
     ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
     ALL_COLS.add(SchemaPath.STAR_COLUMN);
-    boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
-    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
-    if (logger.isDebugEnabled()) {
-      logger.debug(containsCorruptDates.toString());
-    }
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS,
+      readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
     for (BlockMetaData rowGroup : metadata.getBlocks()) {
       List<ColumnMetadata_v3> columnMetadataList = new ArrayList<>();
       long length = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
index e8f2b79..8ce5f37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetMetaStatCollector.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.stat;
 
+import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -29,10 +30,12 @@ import org.apache.parquet.column.statistics.FloatStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.joda.time.DateTimeConstants;
 
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +53,6 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector {
   private final ParquetTableMetadataBase parquetTableMetadata;
   private final List<? extends ColumnMetadata> columnMetadataList;
   private final Map<String, String> implicitColValues;
-
   public ParquetMetaStatCollector(ParquetTableMetadataBase parquetTableMetadata,
       List<? extends ColumnMetadata> columnMetadataList, Map<String, String> implicitColValues) {
     this.parquetTableMetadata = parquetTableMetadata;
@@ -86,30 +88,26 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector {
       columnMetadataMap.put(schemaPath, columnMetadata);
     }
 
-    for (final SchemaPath field : fields) {
-      final PrimitiveType.PrimitiveTypeName primitiveType;
-      final OriginalType originalType;
-
-      final ColumnMetadata columnMetadata = columnMetadataMap.get(field.getUnIndexed());
-
+    for (SchemaPath field : fields) {
+      ColumnMetadata columnMetadata = columnMetadataMap.get(field.getUnIndexed());
       if (columnMetadata != null) {
-        final Object min = columnMetadata.getMinValue();
-        final Object max = columnMetadata.getMaxValue();
-        final long numNulls = columnMetadata.getNulls() == null ? -1 : columnMetadata.getNulls();
-
-        primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
-        originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
-        int precision = 0;
-        int scale = 0;
+        ColumnStatisticsBuilder statisticsBuilder = ColumnStatisticsBuilder.builder()
+          .setMin(columnMetadata.getMinValue())
+          .setMax(columnMetadata.getMaxValue())
+          .setNumNulls(columnMetadata.getNulls() == null ? GroupScan.NO_COLUMN_STATS: columnMetadata.getNulls())
+          .setPrimitiveType(parquetTableMetadata.getPrimitiveType(columnMetadata.getName()))
+          .setOriginalType(parquetTableMetadata.getOriginalType(columnMetadata.getName()));
+
         // ColumnTypeMetadata_v3 stores information about scale and precision
         if (parquetTableMetadata instanceof ParquetTableMetadata_v3) {
           ColumnTypeMetadata_v3 columnTypeInfo = ((ParquetTableMetadata_v3) parquetTableMetadata)
                                                                           .getColumnTypeInfo(columnMetadata.getName());
-          scale = columnTypeInfo.scale;
-          precision = columnTypeInfo.precision;
+          statisticsBuilder.setScale(columnTypeInfo.scale);
+          statisticsBuilder.setPrecision(columnTypeInfo.precision);
         }
 
-        statMap.put(field, getStat(min, max, numNulls, primitiveType, originalType, scale, precision));
+        statMap.put(field, statisticsBuilder.build());
+
       } else {
         final String columnName = field.getRootSegment().getPath();
         if (implicitColValues.containsKey(columnName)) {
@@ -132,62 +130,172 @@ public class ParquetMetaStatCollector implements  ColumnStatCollector {
   }
 
   /**
-   * Builds column statistics using given primitiveType, originalType, scale,
-   * precision, numNull, min and max values.
+   * Helper class that creates parquet {@link ColumnStatistics} based on given
+   * min and max values, type, number of nulls, precision and scale.
    *
-   * @param min             min value for statistics
-   * @param max             max value for statistics
-   * @param numNulls        num_nulls for statistics
-   * @param primitiveType   type that determines statistics class
-   * @param originalType    type that determines statistics class
-   * @param scale           scale value (used for DECIMAL type)
-   * @param precision       precision value (used for DECIMAL type)
-   * @return column statistics
    */
-  private ColumnStatistics getStat(Object min, Object max, long numNulls,
-                                   PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType,
-                                   int scale, int precision) {
-    Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
-    Statistics convertedStat = stat;
-
-    TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType, originalType, scale, precision);
-    stat.setNumNulls(numNulls);
-
-    if (min != null && max != null ) {
-      switch (type.getMinorType()) {
-      case INT :
-      case TIME:
-        ((IntStatistics) stat).setMinMax(Integer.parseInt(min.toString()), Integer.parseInt(max.toString()));
-        break;
-      case BIGINT:
-      case TIMESTAMP:
-        ((LongStatistics) stat).setMinMax(Long.parseLong(min.toString()), Long.parseLong(max.toString()));
-        break;
-      case FLOAT4:
-        ((FloatStatistics) stat).setMinMax(Float.parseFloat(min.toString()), Float.parseFloat(max.toString()));
-        break;
-      case FLOAT8:
-        ((DoubleStatistics) stat).setMinMax(Double.parseDouble(min.toString()), Double.parseDouble(max.toString()));
-        break;
-      case DATE:
-        convertedStat = new LongStatistics();
-        convertedStat.setNumNulls(stat.getNumNulls());
-        final long minMS = convertToDrillDateValue(Integer.parseInt(min.toString()));
-        final long maxMS = convertToDrillDateValue(Integer.parseInt(max.toString()));
-        ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
-        break;
-      case BIT:
-        ((BooleanStatistics) stat).setMinMax(Boolean.parseBoolean(min.toString()), Boolean.parseBoolean(max.toString()));
-        break;
-      default:
-      }
+  private static class ColumnStatisticsBuilder {
+
+    private Object min;
+    private Object max;
+    private long numNulls;
+    private PrimitiveType.PrimitiveTypeName primitiveType;
+    private OriginalType originalType;
+    private int scale;
+    private int precision;
+
+    static ColumnStatisticsBuilder builder() {
+      return new ColumnStatisticsBuilder();
     }
 
-    return new ColumnStatistics(convertedStat, type);
-  }
+    ColumnStatisticsBuilder setMin(Object min) {
+      this.min = min;
+      return this;
+    }
 
-  private static long convertToDrillDateValue(int dateValue) {
+    ColumnStatisticsBuilder setMax(Object max) {
+      this.max = max;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setNumNulls(long numNulls) {
+      this.numNulls = numNulls;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setPrimitiveType(PrimitiveType.PrimitiveTypeName primitiveType) {
+      this.primitiveType = primitiveType;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setOriginalType(OriginalType originalType) {
+      this.originalType = originalType;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setScale(int scale) {
+      this.scale = scale;
+      return this;
+    }
+
+    ColumnStatisticsBuilder setPrecision(int precision) {
+      this.precision = precision;
+      return this;
+    }
+
+
+    /**
+     * Builds column statistics using given primitive and original types,
+     * scale, precision, number of nulls, min and max values.
+     * Min and max values for binary statistics are set only if allowed.
+     *
+     * @return column statistics
+     */
+    ColumnStatistics build() {
+      Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
+      Statistics convertedStat = stat;
+
+      TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType, originalType, scale, precision);
+      stat.setNumNulls(numNulls);
+
+      if (min != null && max != null) {
+        switch (type.getMinorType()) {
+          case INT :
+          case TIME:
+            ((IntStatistics) stat).setMinMax(Integer.parseInt(min.toString()), Integer.parseInt(max.toString()));
+            break;
+          case BIGINT:
+          case TIMESTAMP:
+            ((LongStatistics) stat).setMinMax(Long.parseLong(min.toString()), Long.parseLong(max.toString()));
+            break;
+          case FLOAT4:
+            ((FloatStatistics) stat).setMinMax(Float.parseFloat(min.toString()), Float.parseFloat(max.toString()));
+            break;
+          case FLOAT8:
+            ((DoubleStatistics) stat).setMinMax(Double.parseDouble(min.toString()), Double.parseDouble(max.toString()));
+            break;
+          case DATE:
+            convertedStat = new LongStatistics();
+            convertedStat.setNumNulls(stat.getNumNulls());
+            long minMS = convertToDrillDateValue(Integer.parseInt(min.toString()));
+            long maxMS = convertToDrillDateValue(Integer.parseInt(max.toString()));
+            ((LongStatistics) convertedStat ).setMinMax(minMS, maxMS);
+            break;
+          case BIT:
+            ((BooleanStatistics) stat).setMinMax(Boolean.parseBoolean(min.toString()), Boolean.parseBoolean(max.toString()));
+            break;
+          case VARCHAR:
+            if (min instanceof Binary && max instanceof Binary) { // when read directly from parquet footer
+              ((BinaryStatistics) stat).setMinMaxFromBytes(((Binary) min).getBytes(), ((Binary) max).getBytes());
+            } else if (min instanceof byte[] && max instanceof byte[]) { // when deserialized from Drill metadata file
+              ((BinaryStatistics) stat).setMinMaxFromBytes((byte[]) min, (byte[]) max);
+            } else {
+              logger.trace("Unexpected class for Varchar statistics for min / max values. Min: {}. Max: {}.",
+                min.getClass(), max.getClass());
+            }
+            break;
+          case VARDECIMAL:
+            byte[] minBytes = null;
+            byte[] maxBytes = null;
+            boolean setLength = false;
+
+            switch (primitiveType) {
+              case INT32:
+              case INT64:
+                minBytes = new BigInteger(min.toString()).toByteArray();
+                maxBytes = new BigInteger(max.toString()).toByteArray();
+                break;
+              case FIXED_LEN_BYTE_ARRAY:
+                setLength = true;
+                // fall through
+              case BINARY:
+                // wrap up into BigInteger to avoid PARQUET-1417
+                if (min instanceof Binary && max instanceof Binary) { // when read directly from parquet footer
+                  minBytes = new BigInteger(((Binary) min).getBytes()).toByteArray();
+                  maxBytes = new BigInteger(((Binary) max).getBytes()).toByteArray();
+                } else if (min instanceof byte[] && max instanceof byte[]) {  // when deserialized from Drill metadata file
+                  minBytes = new BigInteger((byte[]) min).toByteArray();
+                  maxBytes = new BigInteger((byte[]) max).toByteArray();
+                } else {
+                  logger.trace("Unexpected class for Binary Decimal statistics for min / max values. Min: {}. Max: {}.",
+                    min.getClass(), max.getClass());
+                }
+                break;
+              default:
+                logger.trace("Unexpected primitive type [{}] for Decimal statistics.", primitiveType);
+            }
+
+            if (minBytes == null || maxBytes == null) {
+              break;
+            }
+
+            int length = setLength ? maxBytes.length : 0;
+
+            PrimitiveType decimalType = org.apache.parquet.schema.Types.optional(PrimitiveType.PrimitiveTypeName.BINARY)
+              .as(OriginalType.DECIMAL)
+              .length(length)
+              .precision(precision)
+              .scale(scale)
+              .named("decimal_type");
+
+            convertedStat = Statistics.getBuilderForReading(decimalType)
+              .withMin(minBytes)
+              .withMax(maxBytes)
+              .withNumNulls(numNulls)
+              .build();
+            break;
+
+          default:
+            logger.trace("Unsupported minor type [{}] for parquet statistics.", type.getMinorType());
+        }
+      }
+      return new ColumnStatistics(convertedStat, type);
+    }
+
+    private long convertToDrillDateValue(int dateValue) {
       return dateValue * (long) DateTimeConstants.MILLIS_PER_DAY;
+    }
+
   }
 
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 23245be..4020e96 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -595,6 +595,7 @@ drill.exec.options: {
     store.parquet.page-size: 1048576,
     store.parquet.reader.columnreader.async: false,
     store.parquet.reader.int96_as_timestamp: false,
+    store.parquet.reader.strings_signed_min_max: "",
     store.parquet.reader.pagereader.async: true,
     store.parquet.reader.pagereader.bufferedread: true,
     store.parquet.reader.pagereader.buffersize: 1048576,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
index 0cbda03..3be2fe5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASPartitionFilter.java
@@ -82,7 +82,7 @@ public class TestCTASPartitionFilter extends PlanTestBase {
     test("use dfs.tmp");
     test("create table drill_3410 partition by (o_orderpriority) as select * from dfs.`multilevel/parquet`");
     String query = "select * from drill_3410 where (o_orderpriority = '1-URGENT' and o_orderkey = 10) or (o_orderpriority = '2-HIGH' or o_orderkey = 11)";
-    testIncludeFilter(query, 1, "Filter\\(", 34);
+    testIncludeFilter(query, 3, "Filter\\(", 34);
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
index 20438cd..7e68dd4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/FormatPluginSerDeTest.java
@@ -35,7 +35,9 @@ public class FormatPluginSerDeTest extends PlanTestBase {
       testPhysicalPlanSubmission(
           String.format("select * from table(cp.`%s`(type=>'parquet'))", "parquet/alltypes_required.parquet"),
           String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false))", "parquet/alltypes_required.parquet"),
-          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", "parquet/alltypes_required.parquet"));
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", "parquet/alltypes_required.parquet"),
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true, enableStringsSignedMinMax=>false))", "parquet/alltypes_required.parquet"),
+          String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false, enableStringsSignedMinMax=>true))", "parquet/alltypes_required.parquet"));
     } finally {
       resetSessionOption(ExecConstants.SLICE_TARGET);
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index e53c394..f43afb7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -54,7 +54,7 @@ public class TestFormatPluginOptionExtractor {
           assertEquals("(type: String, name: String)", d.presentParams());
           break;
         case "parquet":
-          assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: boolean)", d.presentParams());
+          assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
           break;
         case "json":
         case "sequencefile":
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 525e7ed..e80497a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.commons.io.filefilter.FalseFileFilter;
 import org.apache.commons.io.filefilter.TrueFileFilter;
 import org.apache.drill.PlanTestBase;
@@ -27,7 +26,6 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -41,7 +39,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.FileTime;
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -53,7 +50,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
   private static final String TABLE_NAME_2 = "parquetTable2";
 
   @BeforeClass
-  public static void copyData() throws Exception {
+  public static void copyData() {
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(TABLE_NAME_1));
     dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet2"), Paths.get(TABLE_NAME_2));
@@ -791,71 +788,6 @@ public class TestParquetMetadataCache extends PlanTestBase {
   }
 
   @Test // DRILL-4139
-  public void testVarCharWithNullsPartitionPruning() throws Exception {
-    final String intervalYearPartitionTable = "dfs.tmp.`varchar_optional_partition`";
-    try {
-      test("create table %s partition by (col_vrchr) as " +
-        "select * from cp.`parquet/alltypes_optional.parquet`", intervalYearPartitionTable);
-
-      String query = String.format("select * from %s where col_vrchr = 'Nancy Cloke'",
-        intervalYearPartitionTable);
-      int expectedRowCount = 1;
-
-      int actualRowCount = testSql(query);
-      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
-
-      test("refresh table metadata %s", intervalYearPartitionTable);
-
-      actualRowCount = testSql(query);
-      assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-      PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
-    } finally {
-      test("drop table if exists %s", intervalYearPartitionTable);
-    }
-  }
-
-  @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
-  @Test // DRILL-4139
-  public void testDecimalPartitionPruning() throws Exception {
-    List<String> ctasQueries = Lists.newArrayList();
-    // decimal stores as fixed_len_byte_array
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select * from cp.`parquet/fixedlenDecimal.parquet`");
-    // decimal stores as int32
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
-      "from cp.`parquet/fixedlenDecimal.parquet`");
-    // decimal stores as int64
-    ctasQueries.add("create table %s partition by (manager_id) as " +
-      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
-      "from cp.`parquet/fixedlenDecimal.parquet`");
-    final String decimalPartitionTable = "dfs.tmp.`decimal_optional_partition`";
-    for (String ctasQuery : ctasQueries) {
-      try {
-        test("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
-        test(ctasQuery, decimalPartitionTable);
-
-        String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable);
-        int expectedRowCount = 6;
-
-        int actualRowCount = testSql(query);
-        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
-
-        test("refresh table metadata %s", decimalPartitionTable);
-
-        actualRowCount = testSql(query);
-        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
-        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
-      } finally {
-        test("drop table if exists %s", decimalPartitionTable);
-        test("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
-      }
-    }
-  }
-
-  @Test // DRILL-4139
   public void testIntWithNullsPartitionPruning() throws Exception {
     try {
       test("create table dfs.tmp.`t5/a` as\n" +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
new file mode 100644
index 0000000..7bbae41
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestParquetReaderConfig {
+
+  @Test
+  public void testDefaultsDeserialization() throws Exception {
+    ObjectMapper mapper = new ObjectMapper();
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().build(); // all defaults
+    String value = mapper.writeValueAsString(readerConfig);
+    assertEquals(ParquetReaderConfig.getDefaultInstance(), readerConfig); // compare with default instance
+    assertEquals("{}", value);
+
+    readerConfig = mapper.readValue(value, ParquetReaderConfig.class);
+    assertTrue(readerConfig.autoCorrectCorruptedDates()); // check that default value is restored
+
+    // change the default: set autoCorrectCorruptedDates to false
+    // keep the default: set enableStringsSignedMinMax to false
+    readerConfig = new ParquetReaderConfig(false, false, false, false, false);
+
+    value = mapper.writeValueAsString(readerConfig);
+    assertEquals("{\"autoCorrectCorruptedDates\":false}", value);
+  }
+
+  @Test
+  public void testAddConfigToConf() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, true);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, true);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, true);
+
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withConf(conf).build();
+    Configuration newConf = readerConfig.addCountersToConf(new Configuration());
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, "true");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, "true");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, "true");
+
+    conf = new Configuration();
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, false);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, false);
+    conf.setBoolean(ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, false);
+
+    readerConfig = ParquetReaderConfig.builder().withConf(conf).build();
+    newConf = readerConfig.addCountersToConf(new Configuration());
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_READ_COUNTER, "false");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_BYTES_TOTAL_COUNTER, "false");
+    checkConfigValue(newConf, ParquetReaderConfig.ENABLE_TIME_READ_COUNTER, "false");
+  }
+
+  @Test
+  public void testReadOptions() {
+    // set enableStringsSignedMinMax to true
+    ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true);
+    ParquetReadOptions readOptions = readerConfig.toReadOptions();
+    assertTrue(readOptions.useSignedStringMinMax());
+
+    // set enableStringsSignedMinMax to false
+    readerConfig = new ParquetReaderConfig(false, false, false, true, false);
+    readOptions = readerConfig.toReadOptions();
+    assertFalse(readOptions.useSignedStringMinMax());
+  }
+
+  @Test
+  public void testPriorityAssignmentForStringsSignedMinMax() throws Exception {
+    SystemOptionManager options = new SystemOptionManager(DrillConfig.create()).init();
+
+    // use value from format config
+    ParquetFormatConfig formatConfig = new ParquetFormatConfig();
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+
+    // change format config value
+    formatConfig.enableStringsSignedMinMax = true;
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+
+    // set option, option value should have higher priority
+    options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "false");
+
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+    assertFalse(readerConfig.enableStringsSignedMinMax());
+
+    // set option as empty (undefined), config should have higher priority
+    options.setLocalOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "");
+    readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).withOptions(options).build();
+    assertEquals(formatConfig.isStringsSignedMinMaxEnabled(), readerConfig.enableStringsSignedMinMax());
+  }
+
+
+  private void checkConfigValue(Configuration conf, String name, String expectedValue) {
+    String actualValue = conf.get(name);
+    assertNotNull(actualValue);
+    assertEquals(expectedValue, actualValue);
+  }
+
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
new file mode 100644
index 0000000..85c641e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForDecimal.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPushDownAndPruningForDecimal extends ClusterTest {
+
+  private static File fileStore;
+  private List<String> tablesToDrop = new ArrayList<>();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    /*
+      Contains two data files generated by Drill 1.13.0 version (before upgrade to Parquet lib 1.10.0).
+      Also contains .drill.parquet_metadata generated for these two files.
+
+      Schema:
+
+      message root {
+        required int32 part_int_32 (DECIMAL(5,2));
+        required int32 val_int_32 (DECIMAL(5,2));
+        required int64 part_int_64 (DECIMAL(16,2));
+        required int64 val_int_64 (DECIMAL(16,2));
+        required fixed_len_byte_array(12) part_fixed (DECIMAL(16,2));
+        required fixed_len_byte_array(12) val_fixed (DECIMAL(16,2));
+      }
+
+      Data:
+
+      0_0_1.parquet
+      -----------------------------------------------------------------------------
+      part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed
+      -----------------------------------------------------------------------------
+      1.00        | 1.05       | 1.00        | 1.05       | 1.00       | 1.05
+      1.00        | 10.0       | 1.00        | 10.0       | 1.00       | 10.0
+      1.00        | 10.25      | 1.00        | 10.25      | 1.00       | 10.25
+      -----------------------------------------------------------------------------
+
+      0_0_2.parquet
+      -----------------------------------------------------------------------------
+      part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed
+      -----------------------------------------------------------------------------
+      2.00        | 2.05       | 2.00        | 2.05       | 2.00       | 2.05
+      2.00        | 20.0       | 2.00        | 20.0       | 2.00       | 20.0
+      2.00        | 20.25      | 2.00        | 20.25      | 2.00       | 20.25
+      -----------------------------------------------------------------------------
+     */
+    fileStore = dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "decimal_gen_1_13_0"));
+    startCluster(builder);
+  }
+
+  @After
+  public void reset() {
+    // reset all session options that might have been used in the tests
+    client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+    client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+    client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+
+    // drop all tables
+    tablesToDrop.forEach(
+      t -> client.runSqlSilently(String.format("drop table if exists %s", t))
+    );
+    tablesToDrop.clear();
+  }
+
+  /**
+   * Check partition pruning for old and new int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldNewIntDecimalPruningNoMeta() throws Exception {
+    String oldTable = createTable("old_int_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.new_int_decimal_pruning_no_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run();
+    for (String column : Arrays.asList("part_int_32", "part_int_64")) {
+      for (String table : Arrays.asList(oldTable, newTable)) {
+        String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.00 as decimal(5, 2))", table, column);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("val_int_32", "val_int_64")
+          .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+          .baselineValues(new BigDecimal("10.00"), new BigDecimal("10.00"))
+          .baselineValues(new BigDecimal("10.25"), new BigDecimal("10.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new int_32 and int_64 decimal files
+   * using Drill metadata file generated by current Drill version (i.e. after upgrade to parquet 1.10.0)
+   */
+  @Test
+  public void testOldNewIntDecimalPruningWithMeta() throws Exception {
+    String oldTable = createTable("old_int_decimal_pruning_with_meta", true);
+    String newTable = "dfs.`tmp`.new_int_decimal_pruning_with_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run();
+
+    for (String table : Arrays.asList(oldTable, newTable)) {
+      queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+      for (String column : Arrays.asList("part_int_32", "part_int_64")) {
+        String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(2.00 as decimal(5,2))", table, column);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("val_int_32", "val_int_64")
+          .baselineValues(new BigDecimal("2.05"), new BigDecimal("2.05"))
+          .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00"))
+          .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldIntDecimalPushDownNoMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_no_meta", true);
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      // push down does not work for old int decimal types because stats is not read: PARQUET-1322
+      assertTrue(plan.contains("numRowGroups=2"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0
+   */
+  @Test
+  public void testOldIntDecimalPushDownWithOldMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_with_old_meta", false);
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old int_32 and int_64 decimal files
+   * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0
+   */
+  @Test
+  public void testOldIntDecimalPushDownWithNewMeta() throws Exception {
+    String table = createTable("old_int_decimal_push_down_with_new_meta", false);
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      // push down does not work for old int decimal types because stats is not read: PARQUET-1322
+      assertTrue(plan.contains("numRowGroups=2"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for new int_32 and int_64 decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testNewIntDecimalPushDownNoMeta() throws Exception {
+    String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true);
+    String table = "dfs.`tmp`.new_int_decimal_push_down_no_meta";
+    tablesToDrop.add(table);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for new int_32 and int_64 decimal files
+   * using Drill metadata file.
+   */
+  @Test
+  public void testNewIntDecimalPushDownWithMeta() throws Exception {
+    String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true);
+    String table = "dfs.`tmp`.new_int_decimal_push_down_with_meta";
+    tablesToDrop.add(table);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+    queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run();
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    for (String column : Arrays.asList("val_int_32", "val_int_64")) {
+      String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.0 as decimal(5, 2))",
+        table, column);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("val_int_32", "val_int_64")
+        .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00"))
+        .go();
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldNewFixedDecimalPruningNoMeta() throws Exception {
+    String oldTable = createTable("old_fixed_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.new_fixed_decimal_pruning_no_meta";
+    tablesToDrop.add(newTable);
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array");
+    queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s",
+      newTable, oldTable)).run();
+
+    for (String table : Arrays.asList(oldTable, newTable)) {
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part_fixed", "val_fixed")
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+          .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+          .go();
+      }
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPruningWithOldMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_pruning_with_old_meta", false);
+    for (String optionValue : Arrays.asList("true", "false", "")) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+      String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check partition pruning for old and new fixed decimal files
+   * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldNewFixedDecimalPruningWithNewMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_pruning_with_new_meta", false);
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+    for (String optionValue : Arrays.asList("true", "false", "")) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+      String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(2.00 as decimal(5, 2))", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old fixed decimal files
+   * without using Drill metadata file.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownNoMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_no_meta", true);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table);
+    String plan = client.queryBuilder().sql(query).explainText();
+    // statistics for fixed decimal is not available for files generated prior to parquet 1.10.0 version
+    assertTrue(plan.contains("numRowGroups=2"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for old fixed decimal with
+   * using old Drill metadata file, i.e. created prior to upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownWithOldMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_with_old_meta", false);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=2");
+
+    for (Map.Entry<String, String> property : properties.entrySet()) {
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains(property.getValue()));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part_fixed", "val_fixed")
+        .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for old fixed decimal with
+   * using new Drill metadata file, i.e. created after upgrade to parquet 1.10.0.
+   */
+  @Test
+  public void testOldFixedDecimalPushDownWithNewMeta() throws Exception {
+    String table = createTable("old_fixed_decimal_push_down_with_new_meta", true);
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(20.25 as decimal(5, 2))", table);
+
+    queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=2"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for fixed decimal generated after upgrade to parquet 1.10.0
+   * with and without using Drill metadata file.
+   */
+  @Test
+  public void testNewFixedDecimalPushDown() throws Exception {
+    String dataTable = createTable("data_table_for_fixed_decimal_push_down_no_meta", true);
+    String newTable = "dfs.`tmp`.new_fixed_decimal_pruning";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array");
+    queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s",
+      newTable, dataTable)).run();
+
+    String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    // metadata for binary is allowed only after Drill 1.15.0
+    // set string signed option to true since test was written on Drill 1.15.0-SNAPSHOT version
+    client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_fixed", "val_fixed")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check partition pruning for binary decimal with and without
+   * using Drill metadata file.
+   */
+  @Test
+  public void testBinaryDecimalPruning() throws Exception {
+    String dataTable = createTable("data_table for_binary_decimal_pruning_no_meta", true);
+    String newTable = "dfs.`tmp`.binary_decimal_pruning";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary");
+    queryBuilder().sql(String.format("create table %s partition by (part_binary) "
+      + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run();
+
+    String query = String.format("select part_binary, val_binary from %s where part_binary = cast(1.00 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+    assertFalse(plan.contains("Filter"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+    assertFalse(plan.contains("Filter"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00"))
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25"))
+      .go();
+  }
+
+  /**
+   * Check filter push down for binary decimal with and without
+   * using Drill metadata file.
+   */
+  @Test
+  public void testBinaryDecimalPushDown() throws Exception {
+    String dataTable = createTable("data_table_for_binary_decimal_push_down_no_meta", true);
+    String newTable = "dfs.`tmp`.binary_decimal_push_down";
+    tablesToDrop.add(newTable);
+
+    client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+    client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary");
+    queryBuilder().sql(String.format("create table %s partition by (part_binary) "
+      + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run();
+
+    String query = String.format("select part_binary, val_binary from %s where val_binary = cast(1.05 as decimal(5, 2))", newTable);
+    String plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=false"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+
+    queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+    // metadata for binary is allowed only after Drill 1.15.0
+    // set string signed option to true, since test was written on Drill 1.15.0-SNAPSHOT version
+    client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+    plan = client.queryBuilder().sql(query).explainText();
+    assertTrue(plan.contains("numRowGroups=1"));
+    assertTrue(plan.contains("usedMetadataFile=true"));
+
+    client.testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("part_binary", "val_binary")
+      .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05"))
+      .go();
+  }
+
+  /**
+   * Check partition pruning for decimals with different scale.
+   */
+  @Test
+  public void testDecimalPruningDifferentScale() throws Exception {
+    String dataTable = createTable("data_table_for_different_scale_pruning_check", true);
+    String newTable = "dfs.`tmp`.table_for_different_scale_pruning_check";
+    tablesToDrop.add(newTable);
+    queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s",
+      newTable, dataTable)).run();
+
+    for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) {
+      String query = String.format("select part, val from %s where part = cast(2.0 as %s)", newTable, decimalType);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+      assertFalse(plan.contains("Filter"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25"))
+        .go();
+    }
+  }
+
+  /**
+   * Check filter push down for decimals with different scale.
+   */
+  @Test
+  public void testDecimalPushDownDifferentScale() throws Exception {
+    String dataTable = createTable("data_table_for_different_scale_push_down_check", true);
+    String newTable = "dfs.`tmp`.table_for_different_scale_push_down_check";
+    tablesToDrop.add(newTable);
+    queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s",
+      newTable, dataTable)).run();
+
+    for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) {
+      String query = String.format("select part, val from %s where val = cast(20.0 as %s)", newTable, decimalType);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=false"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00"))
+        .go();
+    }
+  }
+
+  @Ignore("Statistics for DECIMAL that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0")
+  @Test
+  public void testDecimalPruningWithNullPartition() throws Exception {
+    List<String> ctasQueries = new ArrayList<>();
+    // decimal stores as fixed_len_byte_array
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select * from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int32
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    // decimal stores as int64
+    ctasQueries.add("create table %s partition by (manager_id) as " +
+      "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " +
+      "from cp.`parquet/fixedlenDecimal.parquet`");
+    final String decimalPartitionTable = "dfs.tmp.`decimal_optional_partition`";
+    for (String ctasQuery : ctasQueries) {
+      try {
+        queryBuilder().sql(String.format(ctasQuery, decimalPartitionTable)).run();
+
+        String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable);
+        int expectedRowCount = 6;
+
+        long actualRowCount = client.queryBuilder().sql(query).run().recordCount();
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=false"}, new String[]{"Filter"});
+
+        queryBuilder().sql(String.format("refresh table metadata %s", decimalPartitionTable)).run();
+
+        actualRowCount = client.queryBuilder().sql(query).run().recordCount();
+        assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount);
+        PlanTestBase.testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, new String[]{"Filter"});
+      } finally {
+        client.runSqlSilently(String.format("drop table if exists %s", decimalPartitionTable));
+      }
+    }
+  }
+
+  private String createTable(String tableName, boolean removeMetadata) throws IOException {
+    File rootDir = dirTestWatcher.getRootDir();
+    File tablePath = new File(rootDir, String.format("%s_%s", tableName, UUID.randomUUID()));
+    FileUtils.copyDirectory(fileStore, tablePath);
+    File metadata = new File(tablePath, ".drill.parquet_metadata");
+    if (removeMetadata) {
+      assertTrue(metadata.delete());
+    } else {
+      // metadata modification time should be higher
+      // than directory modification time otherwise metadata file will be regenerated
+      assertTrue(metadata.setLastModified(Instant.now().toEpochMilli()));
+    }
+    String table = String.format("dfs.`root`.`%s`", tablePath.getName());
+    tablesToDrop.add(table);
+    return table;
+  }
+
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
new file mode 100644
index 0000000..a039468
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestPushDownAndPruningForVarchar.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestPushDownAndPruningForVarchar extends ClusterTest {
+
+  private static File fileStore;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    /*
+      Contains two data files generated by Drill 1.13.0 version
+      (before upgrade to Parquet lib 1.10.0).
+      Each file has two varchar columns.
+
+      0_0_1.parquet       0_0_2.parquet
+      -----------         -----------
+      part | val          part | val
+      -----------         -----------
+      A    | A1           B    | B1
+      A    | A2           B    | B2
+
+      Also contains .drill.parquet_metadata generated for these two files.
+     */
+    fileStore = dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "varchar_gen_1_13_0"));
+    startCluster(builder);
+  }
+
+  @Test
+  public void testOldFilesPruningWithAndWithoutMeta() throws Exception {
+    String tableNoMeta = createTable("varchar_pruning_old_without_meta", true);
+    String tableWithMeta = createTable("varchar_pruning_old_with_meta", false);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(tableNoMeta, "false");
+    properties.put(tableWithMeta, "true");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        for (String optionValue : Arrays.asList("true", "false", "")) {
+          client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+          String query = String.format("select * from %s where part = 'A'", property.getKey());
+          String plan = client.queryBuilder().sql(query).explainText();
+          assertTrue(plan.contains("numRowGroups=1"));
+          assertTrue(plan.contains(String.format("usedMetadataFile=%s", property.getValue())));
+          assertFalse(plan.contains("Filter"));
+
+          client.testBuilder()
+            .sqlQuery(query)
+            .unOrdered()
+            .baselineColumns("part", "val")
+            .baselineValues("A", "A1")
+            .baselineValues("A", "A2")
+            .go();
+        }
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+
+      properties.keySet().
+        forEach(k -> client.runSqlSilently(String.format("drop table if exists %s", k)));
+    }
+  }
+
+  @Test
+  public void testOldFilesPruningWithNewMeta() throws Exception {
+    String table = createTable("varchar_pruning_old_with_new_meta", true);
+
+    try {
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+        String query = String.format("select * from %s where part = 'A'", table);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testNewFilesPruningNoMeta() throws Exception {
+    String oldTable = createTable("varchar_pruning_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_pruning_new_without_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where part = 'A'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Test
+  public void testNewFilesPruningWithNewMeta() throws Exception {
+    String oldTable = createTable("varchar_pruning_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_pruning_new_with_new_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+      queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where part = 'A'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+        assertFalse(plan.contains("Filter"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .baselineValues("A", "A2")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Ignore("Statistics for VARCHAR that has all nulls is not available (PARQUET-1341). Requires upgrade to Parquet 1.11.0.")
+  @Test
+  public void testNewFilesPruningWithNullPartition() throws Exception {
+    String table = "dfs.`tmp`.`varchar_pruning_new_with_null_partition`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (col_vrchr) as " +
+        "select * from cp.`parquet/alltypes_optional.parquet`", table)).run();
+
+      String query = String.format("select * from %s where col_vrchr = 'Nancy Cloke'", table);
+
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("usedMetadataFile=false"));
+      assertFalse(plan.contains("Filter"));
+
+      QueryBuilder.QuerySummary result = client.queryBuilder().sql(query).run();
+      assertTrue(result.succeeded());
+      assertEquals(1, result.recordCount());
+
+      queryBuilder().sql(String.format("refresh table metadata %s", table)).run();
+
+      plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("usedMetadataFile=true"));
+      assertFalse(plan.contains("Filter"));
+
+      result = client.queryBuilder().sql(query).run();
+      assertTrue(result.succeeded());
+      assertEquals(1, result.recordCount());
+    } finally {
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testOldFilesPushDownNoMeta() throws Exception {
+    String table = createTable("varchar_push_down_old_without_meta", true);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("true", "numRowGroups=1");
+    properties.put("false", "numRowGroups=2");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        String query = String.format("select * from %s where val = 'A1'", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains(property.getValue()));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testOldFilesPushDownWithOldMeta() throws Exception {
+    String table = createTable("varchar_push_down_old_with_old_meta", false);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("false", "numRowGroups=2");
+    properties.put("true", "numRowGroups=1");
+
+    try {
+      for (Map.Entry<String, String> property : properties.entrySet()) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
+        String query = String.format("select * from %s where val = 'A1'", table);
+
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains(property.getValue()));
+        assertTrue(plan.contains("usedMetadataFile=true"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", table));
+    }
+  }
+
+  @Test
+  public void testNewFilesPushDownNoMeta() throws Exception {
+    String oldTable = createTable("varchar_push_down_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_push_down_new_without_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+
+      for (String optionValue : Arrays.asList("true", "false", "")) {
+        client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue);
+        String query = String.format("select * from %s where val = 'A1'", newTable);
+        String plan = client.queryBuilder().sql(query).explainText();
+        assertTrue(plan.contains("numRowGroups=1"));
+        assertTrue(plan.contains("usedMetadataFile=false"));
+
+        client.testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("part", "val")
+          .baselineValues("A", "A1")
+          .go();
+      }
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  @Test
+  public void testNewFilesPushDownWithMeta() throws Exception {
+    String oldTable = createTable("varchar_push_down_old_without_meta", true);
+    String newTable = "dfs.`tmp`.`varchar_push_down_new_with_meta`";
+
+    try {
+      queryBuilder().sql(String.format("create table %s partition by (part) as select * from %s", newTable, oldTable)).run();
+      queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run();
+      String query = String.format("select * from %s where val = 'A1'", newTable);
+      // metadata for binary is allowed only after Drill 1.15.0
+      // set string signed option to true, to read it on current Drill 1.15.0-SNAPSHOT version
+      client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true");
+      String plan = client.queryBuilder().sql(query).explainText();
+      assertTrue(plan.contains("numRowGroups=1"));
+      assertTrue(plan.contains("usedMetadataFile=true"));
+
+      client.testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("part", "val")
+        .baselineValues("A", "A1")
+        .go();
+    } finally {
+      client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
+      client.runSqlSilently(String.format("drop table if exists %s", oldTable));
+      client.runSqlSilently(String.format("drop table if exists %s", newTable));
+    }
+  }
+
+  private String createTable(String tableName, boolean removeMetadata) throws IOException {
+    File rootDir = dirTestWatcher.getRootDir();
+    File table = new File(rootDir, String.format("%s_%s", tableName, UUID.randomUUID()));
+    FileUtils.copyDirectory(fileStore, table);
+    File metadata = new File(table, ".drill.parquet_metadata");
+    if (removeMetadata) {
+      assertTrue(metadata.delete());
+    } else {
+      // metadata modification time should be higher
+      // than directory modification time otherwise metadata file will be regenerated
+      assertTrue(metadata.setLastModified(Instant.now().toEpochMilli()));
+    }
+    return String.format("dfs.`root`.`%s`", table.getName());
+  }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index 8e045a2..996898e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -167,7 +167,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
   private void configureZk() {
     // Start ZK if requested.
 
-    String zkConnect = null;
+    String zkConnect;
     if (builder.zkHelper != null) {
       // Case where the test itself started ZK and we're only using it.
 
@@ -594,18 +594,22 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
   /**
    * Convert a Java object (typically a boxed scalar) to a string
    * for use in SQL. Quotes strings but just converts others to
-   * string format.
+   * string format. If value to encode is null, return null.
    *
    * @param value the value to encode
    * @return the SQL-acceptable string equivalent
    */
 
   public static String stringify(Object value) {
+    if (value == null) {
+      return null;
+    }
+
     if (value instanceof String) {
       return "'" + value + "'";
-    } else {
-      return value.toString();
     }
+
+    return value.toString();
   }
 
   public static String getResource(String resource) throws IOException {
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata
new file mode 100644
index 0000000..24c3fc1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/.drill.parquet_metadata
@@ -0,0 +1,146 @@
+{
+  "metadata_version" : "3.3",
+  "columnTypeInfo" : {
+    "`val_int_64`" : {
+      "name" : [ "val_int_64" ],
+      "primitiveType" : "INT64",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`val_int_32`" : {
+      "name" : [ "val_int_32" ],
+      "primitiveType" : "INT32",
+      "originalType" : "DECIMAL",
+      "precision" : 5,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_fixed`" : {
+      "name" : [ "part_fixed" ],
+      "primitiveType" : "FIXED_LEN_BYTE_ARRAY",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_int_64`" : {
+      "name" : [ "part_int_64" ],
+      "primitiveType" : "INT64",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`part_int_32`" : {
+      "name" : [ "part_int_32" ],
+      "primitiveType" : "INT32",
+      "originalType" : "DECIMAL",
+      "precision" : 5,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    },
+    "`val_fixed`" : {
+      "name" : [ "val_fixed" ],
+      "primitiveType" : "FIXED_LEN_BYTE_ARRAY",
+      "originalType" : "DECIMAL",
+      "precision" : 16,
+      "scale" : 2,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 0
+    }
+  },
+  "files" : [ {
+    "path" : "0_0_2.parquet",
+    "length" : 1072,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 390,
+      "rowCount" : 3,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part_int_32" ],
+        "minValue" : 200,
+        "maxValue" : 200,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_32" ],
+        "minValue" : 205,
+        "maxValue" : 2025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_int_64" ],
+        "minValue" : 200,
+        "maxValue" : 200,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_64" ],
+        "minValue" : 205,
+        "maxValue" : 2025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAADI",
+        "maxValue" : "AAAAAAAAAAAAAADI",
+        "nulls" : 0
+      }, {
+        "name" : [ "val_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAADN",
+        "maxValue" : "AAAAAAAAAAAAAAfp",
+        "nulls" : 0
+      } ]
+    } ]
+  }, {
+    "path" : "0_0_1.parquet",
+    "length" : 1072,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 390,
+      "rowCount" : 3,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part_int_32" ],
+        "minValue" : 100,
+        "maxValue" : 100,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_32" ],
+        "minValue" : 105,
+        "maxValue" : 1025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_int_64" ],
+        "minValue" : 100,
+        "maxValue" : 100,
+        "nulls" : 0
+      }, {
+        "name" : [ "val_int_64" ],
+        "minValue" : 105,
+        "maxValue" : 1025,
+        "nulls" : 0
+      }, {
+        "name" : [ "part_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAABk",
+        "maxValue" : "AAAAAAAAAAAAAABk",
+        "nulls" : 0
+      }, {
+        "name" : [ "val_fixed" ],
+        "minValue" : "AAAAAAAAAAAAAABp",
+        "maxValue" : "AAAAAAAAAAAAAAQB",
+        "nulls" : 0
+      } ]
+    } ]
+  } ],
+  "directories" : [ ],
+  "drillVersion" : "1.13.0"
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet
new file mode 100644
index 0000000..c5f6e89
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_1.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet
new file mode 100644
index 0000000..cd8a930
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_gen_1_13_0/0_0_2.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata
new file mode 100644
index 0000000..9d8525b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/.drill.parquet_metadata
@@ -0,0 +1,70 @@
+{
+  "metadata_version" : "3.3",
+  "columnTypeInfo" : {
+    "`val`" : {
+      "name" : [ "val" ],
+      "primitiveType" : "BINARY",
+      "originalType" : "UTF8",
+      "precision" : 0,
+      "scale" : 0,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 1
+    },
+    "`part`" : {
+      "name" : [ "part" ],
+      "primitiveType" : "BINARY",
+      "originalType" : "UTF8",
+      "precision" : 0,
+      "scale" : 0,
+      "repetitionLevel" : 0,
+      "definitionLevel" : 1
+    }
+  },
+  "files" : [ {
+    "path" : "0_0_2.parquet",
+    "length" : 368,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 90,
+      "rowCount" : 2,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part" ],
+        "minValue" : "Qg==",
+        "maxValue" : "Qg==",
+        "nulls" : 0
+      }, {
+        "name" : [ "val" ],
+        "minValue" : "QjE=",
+        "maxValue" : "QjI=",
+        "nulls" : 0
+      } ]
+    } ]
+  }, {
+    "path" : "0_0_1.parquet",
+    "length" : 368,
+    "rowGroups" : [ {
+      "start" : 4,
+      "length" : 90,
+      "rowCount" : 2,
+      "hostAffinity" : {
+        "localhost" : 1.0
+      },
+      "columns" : [ {
+        "name" : [ "part" ],
+        "minValue" : "QQ==",
+        "maxValue" : "QQ==",
+        "nulls" : 0
+      }, {
+        "name" : [ "val" ],
+        "minValue" : "QTE=",
+        "maxValue" : "QTI=",
+        "nulls" : 0
+      } ]
+    } ]
+  } ],
+  "directories" : [ ],
+  "drillVersion" : "1.13.0"
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet
new file mode 100644
index 0000000..bf7f8d8
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_1.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet
new file mode 100644
index 0000000..c76ed91
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/varchar_gen_1_13_0/0_0_2.parquet differ
diff --git a/pom.xml b/pom.xml
index a75e251..902e3c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,7 +63,6 @@
     <hamcrest.core.version>1.3</hamcrest.core.version>
     <maven.embedder.version>3.5.3</maven.embedder.version>
     <curator.version>2.7.1</curator.version>
-    <parquet.hadoop.version>1.8.1</parquet.hadoop.version>
     <wiremock.standalone.version>2.5.1</wiremock.standalone.version>
     <jmockit.version>1.39</jmockit.version>
     <logback.version>1.0.13</logback.version>
@@ -376,6 +375,7 @@
             <exclude>**/*.autotools</exclude>
             <exclude>**/*.cproject</exclude>
             <exclude>**/*.drill</exclude>
+            <exclude>**/.drill.parquet_metadata</exclude>
             <!-- TODO DRILL-4336: try to avoid the need to add this -->
             <exclude>dependency-reduced-pom.xml</exclude>
           </excludes>
@@ -646,6 +646,7 @@
               <exclude>**/NOTICE</exclude>
               <exclude>KEYS</exclude>
               <exclude>header</exclude>
+              <exclude>**/.drill.parquet_metadata</exclude>
               <!-- TODO DRILL-4336: try to avoid the need to add this -->
               <exclude>dependency-reduced-pom.xml</exclude>
             </excludes>