You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/04/04 00:13:38 UTC

[impala] branch master updated (b40870d -> 1231208)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from b40870d  IMPALA-10624: TestIcebergTable::test_alter_iceberg_tables failed by stale file format
     new 1ab1143  Bump up the GBN number to 11920537
     new 1231208  IMPALA-10494: Making use of the min/max column stats to improve min/max filters

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/catalog-op-executor.cc                 |  18 +-
 be/src/exec/filter-context.cc                      |  55 +++
 be/src/exec/filter-context.h                       |   8 +
 be/src/exec/hdfs-scanner.h                         |   8 +-
 be/src/exec/incr-stats-util-test.cc                |  74 ++-
 be/src/exec/incr-stats-util.cc                     |  82 +++-
 be/src/exec/incr-stats-util.h                      |  15 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  65 ++-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  14 +
 be/src/exec/partitioned-hash-join-builder.cc       |  36 +-
 be/src/service/hs2-util.cc                         | 114 ++++-
 be/src/service/hs2-util.h                          |  14 +
 be/src/service/query-options.cc                    |   8 +
 be/src/service/query-options.h                     |   6 +-
 be/src/util/min-max-filter-test.cc                 |  18 +
 be/src/util/min-max-filter.cc                      |  63 ++-
 be/src/util/min-max-filter.h                       |   2 +-
 bin/impala-config.sh                               |  24 +-
 common/thrift/CatalogObjects.thrift                |   9 +
 common/thrift/Frontend.thrift                      |   1 +
 common/thrift/ImpalaService.thrift                 |   6 +
 common/thrift/PlanNodes.thrift                     |   5 +
 common/thrift/Query.thrift                         |   6 +
 fe/pom.xml                                         |   8 +
 .../apache/impala/analysis/ComputeStatsStmt.java   |  53 ++
 .../org/apache/impala/analysis/ShowStatsStmt.java  |   7 +-
 .../org/apache/impala/catalog/ColumnStats.java     | 540 ++++++++++++++++++++-
 .../org/apache/impala/catalog/HdfsFileFormat.java  |   7 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  13 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  16 +-
 .../impala/planner/RuntimeFilterGenerator.java     |  31 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   8 +-
 .../java/org/apache/impala/service/Frontend.java   |  42 +-
 .../org/apache/impala/service/JniFrontend.java     |   3 +-
 .../java/org/apache/impala/util/MetaStoreUtil.java |   9 +
 .../impala/catalog/FileMetadataLoaderTest.java     |   3 +-
 java/shaded-deps/hive-exec/pom.xml                 |   4 +
 java/test-hive-udfs/pom.xml                        |   4 +
 .../QueryTest/compute-stats-column-minmax.test     |  95 ++++
 .../queries/QueryTest/overlap_min_max_filters.test | 103 ++--
 tests/metadata/test_compute_stats.py               |  17 +
 tests/query_test/test_runtime_filters.py           |   4 +-
 42 files changed, 1463 insertions(+), 155 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/compute-stats-column-minmax.test

[impala] 01/02: Bump up the GBN number to 11920537

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1ab1143e98ff09610dff82d1795cf103659ffe97
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Tue Mar 23 12:09:57 2021 -0700

    Bump up the GBN number to 11920537
    
    This change bumps up the GBN to 11920537 which includes several
    changes to Hive needed to support Catalogd's HMS endpoint for
    supporting external frontends.
    
    Additionally, it excludes some dependencies from the pom.xml
    which are not uploaded by default to the toolchain.
    
    After the GBN bump up Hive doesn't write '_orc_acid_version'
    files and hence the FileMetadataLoaderTest needed to be
    modified.
    
    Change-Id: If88ceeaffc94e5bedf2c9953122109e20663f743
    Reviewed-on: http://gerrit.cloudera.org:8080/17243
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/impala-config.sh                               | 24 +++++++++++-----------
 fe/pom.xml                                         |  8 ++++++++
 .../impala/catalog/FileMetadataLoaderTest.java     |  3 +--
 java/shaded-deps/hive-exec/pom.xml                 |  4 ++++
 java/test-hive-udfs/pom.xml                        |  4 ++++
 5 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 4cacc3c..2e3166e 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -176,20 +176,20 @@ export IMPALA_TOOLCHAIN_HOST
 # and adjust existing tests that mentions them:
 # * HIVE-23995
 # * HIVE-24175
-export CDP_BUILD_NUMBER=7049391
+export CDP_BUILD_NUMBER=11920537
 export CDP_MAVEN_REPOSITORY=\
 "https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven"
-export CDP_AVRO_JAVA_VERSION=1.8.2.7.2.7.0-44
-export CDP_HADOOP_VERSION=3.1.1.7.2.7.0-44
-export CDP_HBASE_VERSION=2.2.6.7.2.7.0-44
-export CDP_HIVE_VERSION=3.1.3000.7.2.7.0-44
-export CDP_ICEBERG_VERSION=0.9.1.7.2.7.0-44
-export CDP_KNOX_VERSION=1.3.0.7.2.7.0-44
-export CDP_OZONE_VERSION=1.0.0.7.2.7.0-44
-export CDP_PARQUET_VERSION=1.10.99.7.2.7.0-44
-export CDP_RANGER_VERSION=2.1.0.7.2.7.0-44
-export CDP_TEZ_VERSION=0.9.1.7.2.7.0-44
-export CDP_GCS_VERSION=2.1.2.7.2.7.0-44
+export CDP_AVRO_JAVA_VERSION=1.8.2.7.2.9.0-146
+export CDP_HADOOP_VERSION=3.1.1.7.2.9.0-146
+export CDP_HBASE_VERSION=2.2.6.7.2.9.0-146
+export CDP_HIVE_VERSION=3.1.3000.7.2.9.0-146
+export CDP_ICEBERG_VERSION=0.9.1.7.2.9.0-146
+export CDP_KNOX_VERSION=1.3.0.7.2.9.0-146
+export CDP_OZONE_VERSION=1.0.0.7.2.9.0-146
+export CDP_PARQUET_VERSION=1.10.99.7.2.9.0-146
+export CDP_RANGER_VERSION=2.1.0.7.2.9.0-146
+export CDP_TEZ_VERSION=0.9.1.7.2.9.0-146
+export CDP_GCS_VERSION=2.1.2.7.2.9.0-146
 
 export ARCH_NAME=$(uname -p)
 
diff --git a/fe/pom.xml b/fe/pom.xml
index e100ff3..73be13c 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -111,6 +111,10 @@ under the License.
       <artifactId>ranger-plugins-common</artifactId>
       <version>${ranger.version}</version>
       <exclusions>
+        <exclusion>
+          <groupId>org.apache.solr</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
         <!-- Exclude json-smart to pin the version -->
         <exclusion>
           <groupId>net.minidev</groupId>
@@ -133,6 +137,10 @@ under the License.
           <artifactId>shiro-core</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.solr</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.eclipse.jetty</groupId>
           <artifactId>*</artifactId>
         </exclusion>
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 9ccdc0c..5fd2f7f 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -159,8 +159,7 @@ public class FileMetadataLoaderTest {
     fml.load();
     // Only load the compacted file.
     assertEquals(1, fml.getStats().loadedFiles);
-    // 2 * 8 files since the hidden '_orc_acid_version' is filtered out later.
-    assertEquals(16, fml.getStats().filesSupersededByAcidState);
+    assertEquals(8, fml.getStats().filesSupersededByAcidState);
   }
 
   @Test
diff --git a/java/shaded-deps/hive-exec/pom.xml b/java/shaded-deps/hive-exec/pom.xml
index 639abc6..95030c6 100644
--- a/java/shaded-deps/hive-exec/pom.xml
+++ b/java/shaded-deps/hive-exec/pom.xml
@@ -57,6 +57,10 @@ the same dependencies
           <groupId>net.minidev</groupId>
           <artifactId>json-smart</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.atlas</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
   </dependencies>
diff --git a/java/test-hive-udfs/pom.xml b/java/test-hive-udfs/pom.xml
index eb63bb6..6ef7f24 100644
--- a/java/test-hive-udfs/pom.xml
+++ b/java/test-hive-udfs/pom.xml
@@ -70,6 +70,10 @@ under the License.
           <groupId>org.apache.hive.shims</groupId>
           <artifactId>hive-shims-0.20</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.atlas</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

[impala] 02/02: IMPALA-10494: Making use of the min/max column stats to improve min/max filters

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1231208da7104c832c13f272d1e5b8f554d29337
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Wed Feb 17 10:00:28 2021 -0500

    IMPALA-10494: Making use of the min/max column stats to improve min/max filters
    
    This patch adds the functionality to compute the minimal and the maximal
    value for column types of integer, float/double, date, or decimal for
    parquet tables, and to make use of the new stats to discard min/max
    filters, in both hash join builders and Parquet scanners, when their
    coverage are too close to the actual range defined by the column min
    and max.
    
    The computation and dislay of the new column min/max stats can be
    controlled by two new Boolean query options (default to false):
      1. compute_column_minmax_stats
      2. show_column_minmax_stats
    
    Usage examples.
    
      set compute_column_minmax_stats=true;
      compute stats tpcds_parquet.store_sales;
    
      set show_column_minmax_stats=true;
      show column stats tpcds_parquet.store_sales;
    
    +-----------------------+--------------+-...-------+---------+---------+
    | Column                | Type         |   #Falses | Min     | Max     |
    +-----------------------+--------------+-...-------+---------+---------+
    | ss_sold_time_sk       | INT          |   -1      | 28800   | 75599   |
    | ss_item_sk            | BIGINT       |   -1      | 1       | 18000   |
    | ss_customer_sk        | INT          |   -1      | 1       | 100000  |
    | ss_cdemo_sk           | INT          |   -1      | 15      | 1920797 |
    | ss_hdemo_sk           | INT          |   -1      | 1       | 7200    |
    | ss_addr_sk            | INT          |   -1      | 1       | 50000   |
    | ss_store_sk           | INT          |   -1      | 1       | 10      |
    | ss_promo_sk           | INT          |   -1      | 1       | 300     |
    | ss_ticket_number      | BIGINT       |   -1      | 1       | 240000  |
    | ss_quantity           | INT          |   -1      | 1       | 100     |
    | ss_wholesale_cost     | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_list_price         | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_sales_price        | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_ext_discount_amt   | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_ext_sales_price    | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_ext_wholesale_cost | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_ext_list_price     | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_ext_tax            | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_coupon_amt         | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_net_paid           | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_net_paid_inc_tax   | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_net_profit         | DECIMAL(7,2) |   -1      | -1      | -1      |
    | ss_sold_date_sk       | INT          |   -1      | 2450816 | 2452642 |
    +-----------------------+--------------+-...-------+---------+---------+
    
    Only the min/max values for non-partition columns are stored in HMS.
    The min/max values for partition columns are computed in coordinator.
    
    The min-max filters, in C++ class or protobuf form, are augmented to
    deal with the always true state better. Once always true is set, the
    actual min and max values in the filter are no longer populated.
    
    Testing:
     - Added new compute/show stats tests in
       compute-stats-column-minmax.test;
     - Added new tests in overlap_min_max_filters.test to demonstrate the
       usefulness of column stats to quickly disable useless filters in
       both hash join builder and Parquet scanner;
     - Added tests in min-max-filter-test.cc to demonstrate method Or(),
       ToProtobuf() and constructor can deal with always true flag well;
     - Tested with TPCDS 3TB to demonstrate the usefulness of the min
       and max column stats in disabling min/max filters that are not
       useful.
     - core tests.
    
    TODO:
     1. IMPALA-10602: Intersection of multiple min/max filters when
        applying to common equi-join columns;
     2. IMPALA-10601: Creating lineitem_orderkey_only table in
        tpch_parquet database;
     3. IMPALA-10603: Enable min/max overlap filter feature for Iceberg
        tables with Parquet data files;
     4. IMPALA-10617: Compute min/max column stats beyond parquet tables.
    
    Change-Id: I08581b44419bb8da5940cbf98502132acd1c86df
    Reviewed-on: http://gerrit.cloudera.org:8080/17075
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/catalog-op-executor.cc                 |  18 +-
 be/src/exec/filter-context.cc                      |  55 +++
 be/src/exec/filter-context.h                       |   8 +
 be/src/exec/hdfs-scanner.h                         |   8 +-
 be/src/exec/incr-stats-util-test.cc                |  74 ++-
 be/src/exec/incr-stats-util.cc                     |  82 +++-
 be/src/exec/incr-stats-util.h                      |  15 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  65 ++-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  14 +
 be/src/exec/partitioned-hash-join-builder.cc       |  36 +-
 be/src/service/hs2-util.cc                         | 114 ++++-
 be/src/service/hs2-util.h                          |  14 +
 be/src/service/query-options.cc                    |   8 +
 be/src/service/query-options.h                     |   6 +-
 be/src/util/min-max-filter-test.cc                 |  18 +
 be/src/util/min-max-filter.cc                      |  63 ++-
 be/src/util/min-max-filter.h                       |   2 +-
 common/thrift/CatalogObjects.thrift                |   9 +
 common/thrift/Frontend.thrift                      |   1 +
 common/thrift/ImpalaService.thrift                 |   6 +
 common/thrift/PlanNodes.thrift                     |   5 +
 common/thrift/Query.thrift                         |   6 +
 .../apache/impala/analysis/ComputeStatsStmt.java   |  53 ++
 .../org/apache/impala/analysis/ShowStatsStmt.java  |   7 +-
 .../org/apache/impala/catalog/ColumnStats.java     | 540 ++++++++++++++++++++-
 .../org/apache/impala/catalog/HdfsFileFormat.java  |   7 +
 .../java/org/apache/impala/catalog/HdfsTable.java  |  13 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  16 +-
 .../impala/planner/RuntimeFilterGenerator.java     |  31 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   8 +-
 .../java/org/apache/impala/service/Frontend.java   |  42 +-
 .../org/apache/impala/service/JniFrontend.java     |   3 +-
 .../java/org/apache/impala/util/MetaStoreUtil.java |   9 +
 .../QueryTest/compute-stats-column-minmax.test     |  95 ++++
 .../queries/QueryTest/overlap_min_max_filters.test | 103 ++--
 tests/metadata/test_compute_stats.py               |  17 +
 tests/query_test/test_runtime_filters.py           |   4 +-
 37 files changed, 1434 insertions(+), 141 deletions(-)

diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index d00b0eb..05fe0c8 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -312,9 +312,9 @@ void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema,
   // Set per-column stats. For a column at position i in its source table,
   // the NDVs and the number of NULLs are at position i and i + 1 of the
   // col_stats_row, respectively. Positions i + 2 and i + 3 contain the max/avg
-  // length for string columns, Positions i+4 and i+5 contains the numTrues/numFalses
-  // and -1 for non-string columns.
-  for (int i = 0; i < col_stats_row.colVals.size(); i += 6) {
+  // length for string columns. Positions i+4 and i+5 contain the numTrues/numFalses
+  // (-1 for non-string columns). Positions i+6 and i+7 contain the min and the max.
+  for (int i = 0; i < col_stats_row.colVals.size(); i += 8) {
     TColumnStats col_stats;
     col_stats.__set_num_distinct_values(col_stats_row.colVals[i].i64Val.value);
     col_stats.__set_num_nulls(col_stats_row.colVals[i + 1].i64Val.value);
@@ -322,6 +322,18 @@ void CatalogOpExecutor::SetColumnStats(const TTableSchema& col_stats_schema,
     col_stats.__set_avg_size(col_stats_row.colVals[i + 3].doubleVal.value);
     col_stats.__set_num_trues(col_stats_row.colVals[i + 4].i64Val.value);
     col_stats.__set_num_falses(col_stats_row.colVals[i + 5].i64Val.value);
+    // By default, the low and high value in TColumnStats are unset. Set both
+    // conditionally.
+    TColumnValue low_value = ConvertToTColumnValue(
+        col_stats_schema.columns[i + 6], col_stats_row.colVals[i + 6]);
+    if (isOneFieldSet(low_value)) {
+      col_stats.__set_low_value(low_value);
+    }
+    TColumnValue high_value = ConvertToTColumnValue(
+        col_stats_schema.columns[i + 7], col_stats_row.colVals[i + 7]);
+    if (isOneFieldSet(high_value)) {
+      col_stats.__set_high_value(high_value);
+    }
     params->column_stats[col_stats_schema.columns[i].columnName] = col_stats;
   }
   params->__isset.column_stats = true;
diff --git a/be/src/exec/filter-context.cc b/be/src/exec/filter-context.cc
index 28af515..bc92c46 100644
--- a/be/src/exec/filter-context.cc
+++ b/be/src/exec/filter-context.cc
@@ -23,6 +23,7 @@
 #include "runtime/tuple-row.h"
 #include "util/min-max-filter.h"
 #include "util/runtime-profile-counters.h"
+#include "service/hs2-util.h"
 
 using namespace impala;
 using namespace strings;
@@ -426,3 +427,57 @@ bool FilterContext::CheckForAlwaysFalse(const std::string& stats_name,
   }
   return false;
 }
+
+// Return true if both the filter and column min/max stats exist and the overlap of filter
+// range [min,max] with column stats range [min, max] is more than 'threshold'. Return
+// false otherwise.
+bool FilterContext::ShouldRejectFilterBasedOnColumnStats(
+    const TRuntimeFilterTargetDesc& desc, MinMaxFilter* minmax_filter, float threshold) {
+  if (!minmax_filter) {
+    return false;
+  }
+  const TColumnValue& column_low_value = desc.low_value;
+  const TColumnValue& column_high_value = desc.high_value;
+  ColumnType col_type = ColumnType::FromThrift(desc.target_expr.nodes[0].type);
+  float ratio = 0.0;
+  switch (col_type.type) {
+    case PrimitiveType::TYPE_TINYINT:
+      if (!column_low_value.__isset.byte_val || !column_high_value.__isset.byte_val)
+        return false;
+      ratio = minmax_filter->ComputeOverlapRatio(col_type,
+          (void*)&column_low_value.byte_val, (void*)&column_high_value.byte_val);
+      break;
+    case PrimitiveType::TYPE_SMALLINT:
+      if (!column_low_value.__isset.short_val || !column_high_value.__isset.short_val)
+        return false;
+      ratio = minmax_filter->ComputeOverlapRatio(col_type,
+          (void*)&column_low_value.short_val, (void*)&column_high_value.short_val);
+      break;
+    case PrimitiveType::TYPE_INT:
+      if (!column_low_value.__isset.int_val || !column_high_value.__isset.int_val)
+        return false;
+      ratio = minmax_filter->ComputeOverlapRatio(col_type,
+          (void*)&column_low_value.int_val, (void*)&column_high_value.int_val);
+      break;
+    case PrimitiveType::TYPE_BIGINT:
+      if (!column_low_value.__isset.long_val || !column_high_value.__isset.long_val)
+        return false;
+      ratio = minmax_filter->ComputeOverlapRatio(col_type,
+          (void*)&column_low_value.long_val, (void*)&column_high_value.long_val);
+      break;
+    case PrimitiveType::TYPE_FLOAT:
+    case PrimitiveType::TYPE_DOUBLE:
+      if (!column_low_value.__isset.double_val || !column_high_value.__isset.double_val)
+        return false;
+      ratio = minmax_filter->ComputeOverlapRatio(col_type,
+          (void*)&column_low_value.double_val, (void*)&column_high_value.double_val);
+      break;
+    // Both timestamp and date are not supported since their low/high stats can't be
+    // stored in HMS yet.
+    case PrimitiveType::TYPE_TIMESTAMP:
+    case PrimitiveType::TYPE_DATE:
+    default:
+      return false;
+  }
+  return ratio >= threshold;
+}
diff --git a/be/src/exec/filter-context.h b/be/src/exec/filter-context.h
index c84a730..0a118b5 100644
--- a/be/src/exec/filter-context.h
+++ b/be/src/exec/filter-context.h
@@ -146,6 +146,14 @@ struct FilterContext {
   // is updated.
   static bool CheckForAlwaysFalse(const std::string& stats_name,
       const std::vector<FilterContext>& ctxs);
+
+  /// Returns true if 'filter' is a min-max filter and whose range overlaps enough
+  /// with the range defined by the column low and high values in 'desc'. Return false
+  /// otherwise. The degree of the overlap is determined by the overlap ratio and the
+  /// 'threshold' (query option 'minmax_filter_threshold') that is used as a lower bound
+  /// for the ratio.
+  static bool ShouldRejectFilterBasedOnColumnStats(const TRuntimeFilterTargetDesc& desc,
+      MinMaxFilter* minmax_filter, float threshold);
 };
 
 }
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 8e35124..e3674d3 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -344,15 +344,19 @@ class HdfsScanner {
     /// Apply the filter at page level only.
     uint8_t enabled_for_page;
 
+    /// Apply the filter at row group level only.
+    uint8_t enabled_for_rowgroup;
+
     /// Padding to ensure structs do not straddle cache-line boundary.
-    uint8_t padding[6];
+    uint8_t padding[5];
 
     LocalFilterStats()
       : considered(0),
         rejected(0),
         total_possible(0),
         enabled_for_row(1),
-        enabled_for_page(1) {}
+        enabled_for_page(1),
+        enabled_for_rowgroup(1) {}
   };
 
   /// Cached runtime filter contexts, one for each filter that applies to this column.
diff --git a/be/src/exec/incr-stats-util-test.cc b/be/src/exec/incr-stats-util-test.cc
index 11eb634..e92c0ef 100644
--- a/be/src/exec/incr-stats-util-test.cc
+++ b/be/src/exec/incr-stats-util-test.cc
@@ -71,6 +71,56 @@ TEST(IncrStatsUtilTest, TestEncode) {
   ASSERT_EQ(DecodeNdv(encoded, is_encoded), test);
 }
 
+void checkLowAndHighValueInt(
+    const TColumnStats& stats, int expected_low, int expected_high) {
+  ASSERT_TRUE(stats.low_value.__isset.int_val);
+  ASSERT_TRUE(stats.high_value.__isset.int_val);
+  ASSERT_EQ(expected_low, stats.low_value.int_val);
+  ASSERT_EQ(expected_high, stats.high_value.int_val);
+}
+
+TEST(IncrStatsUtilTest, TestLowAndHighValueInt) {
+  PerColumnStats* stat = new PerColumnStats();
+  TColumnValue lv;
+  TColumnValue hv;
+
+  lv.__set_int_val(10);
+  hv.__set_int_val(20);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
+
+  checkLowAndHighValueInt(stat->ToTColumnStats(), 10, 20);
+
+  lv.__set_int_val(2);
+  hv.__set_int_val(30);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
+  checkLowAndHighValueInt(stat->ToTColumnStats(), 2, 30);
+}
+
+void checkLowAndHighValueShort(
+    const TColumnStats& stats, short expected_low, short expected_high) {
+  ASSERT_TRUE(stats.low_value.__isset.short_val);
+  ASSERT_TRUE(stats.high_value.__isset.short_val);
+  ASSERT_EQ(expected_low, stats.low_value.short_val);
+  ASSERT_EQ(expected_high, stats.high_value.short_val);
+}
+
+TEST(IncrStatsUtilTest, TestLowAndHighValueShort) {
+  PerColumnStats* stat = new PerColumnStats();
+  TColumnValue lv;
+  TColumnValue hv;
+
+  lv.__set_short_val(10);
+  hv.__set_short_val(20);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
+
+  checkLowAndHighValueShort(stat->ToTColumnStats(), 10, 20);
+
+  lv.__set_short_val(14);
+  hv.__set_short_val(30);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
+  checkLowAndHighValueShort(stat->ToTColumnStats(), 10, 30);
+}
+
 /**
  * This test checks the acceptable 'new_num_null' values by the PerColumnStats.Update
  * method. In earlier releases the number of null values were not counted and the
@@ -81,26 +131,28 @@ TEST(IncrStatsUtilTest, TestEncode) {
 TEST(IncrStatsUtilTest, TestNumNullAggregation) {
   PerColumnStats* stat = new PerColumnStats();
   ASSERT_EQ(0, stat->ToTColumnStats().num_nulls);
+  TColumnValue lv;
+  TColumnValue hv;
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 1, 0, 0, lv, hv);
   ASSERT_EQ(1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
   ASSERT_EQ(1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 2, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 2, 0, 0, lv, hv);
   ASSERT_EQ(3, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0, lv, hv);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 3, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 3, 0, 0, lv, hv);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, -1, 0, 0, lv, hv);
   ASSERT_EQ(-1, stat->ToTColumnStats().num_nulls);
 }
 
@@ -111,16 +163,18 @@ TEST(IncrStatsUtilTest, TestNumNullAggregation) {
 */
 TEST(IncrStatsUtilTest, TestAvgSizehAggregation) {
   PerColumnStats* stat = new PerColumnStats();
+  TColumnValue lv;
+  TColumnValue hv;
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 1, 4, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 1, 4, 0, 0, 0, 0, lv, hv);
   stat->Finalize();
   ASSERT_EQ(4, stat->ToTColumnStats().avg_size);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 2, 7, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 2, 7, 0, 0, 0, 0, lv, hv);
   stat->Finalize();
   ASSERT_EQ(6, stat->ToTColumnStats().avg_size);
 
-  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0);
+  stat->Update(string(AggregateFunctions::DEFAULT_HLL_LEN, 0), 0, 0, 0, 0, 0, 0, lv, hv);
   stat->Finalize();
   ASSERT_EQ(6, stat->ToTColumnStats().avg_size);
 }
diff --git a/be/src/exec/incr-stats-util.cc b/be/src/exec/incr-stats-util.cc
index 763c68d..40a268c 100644
--- a/be/src/exec/incr-stats-util.cc
+++ b/be/src/exec/incr-stats-util.cc
@@ -112,9 +112,48 @@ string DecodeNdv(const string& ndv, bool is_encoded) {
   return decoded_ndv;
 }
 
+#define UPDATE_LOW_VALUE(TYPE)                                                      \
+  if (!(low_value.__isset.TYPE##_val) || value.TYPE##_val < low_value.TYPE##_val) { \
+    low_value.__set_##TYPE##_val(value.TYPE##_val);                                 \
+  }
+
+void PerColumnStats::UpdateLowValue(const impala::TColumnValue& value) {
+  if (value.__isset.double_val) {
+    UPDATE_LOW_VALUE(double);
+  } else if (value.__isset.byte_val) {
+    UPDATE_LOW_VALUE(byte);
+  } else if (value.__isset.int_val) {
+    UPDATE_LOW_VALUE(int);
+  } else if (value.__isset.short_val) {
+    UPDATE_LOW_VALUE(short);
+  } else if (value.__isset.long_val) {
+    UPDATE_LOW_VALUE(long);
+  }
+}
+
+#define UPDATE_HIGH_VALUE(TYPE)                                                       \
+  if (!(high_value.__isset.TYPE##_val) || value.TYPE##_val > high_value.TYPE##_val) { \
+    high_value.__set_##TYPE##_val(value.TYPE##_val);                                  \
+  }
+
+void PerColumnStats::UpdateHighValue(const impala::TColumnValue& value) {
+  if (value.__isset.double_val) {
+    UPDATE_HIGH_VALUE(double);
+  } else if (value.__isset.byte_val) {
+    UPDATE_HIGH_VALUE(byte);
+  } else if (value.__isset.int_val) {
+    UPDATE_HIGH_VALUE(int);
+  } else if (value.__isset.short_val) {
+    UPDATE_HIGH_VALUE(short);
+  } else if (value.__isset.long_val) {
+    UPDATE_HIGH_VALUE(long);
+  }
+}
+
 void PerColumnStats::Update(const string& ndv, int64_t num_new_rows, double new_avg_width,
     int32_t max_new_width, int64_t num_new_nulls, int64_t num_new_trues,
-    int64_t num_new_falses) {
+    int64_t num_new_falses, const impala::TColumnValue& low_value_new,
+    const impala::TColumnValue& high_value_new) {
   DCHECK_EQ(intermediate_ndv.size(), ndv.size()) << "Incompatible intermediate NDVs";
   DCHECK_GE(num_new_rows, 0);
   DCHECK_GE(max_new_width, 0);
@@ -140,6 +179,9 @@ void PerColumnStats::Update(const string& ndv, int64_t num_new_rows, double new_
   max_width = ::max(max_width, max_new_width);
   total_width += (new_avg_width * num_new_rows);
   num_rows += num_new_rows;
+
+  UpdateLowValue(low_value_new);
+  UpdateHighValue(high_value_new);
 }
 
 void PerColumnStats::Finalize() {
@@ -156,13 +198,20 @@ TColumnStats PerColumnStats::ToTColumnStats() const {
   col_stats.__set_avg_size(avg_width);
   col_stats.__set_num_trues(num_trues);
   col_stats.__set_num_falses(num_falses);
+  col_stats.__set_low_value(low_value);
+  col_stats.__set_high_value(high_value);
   return col_stats;
 }
 
 string PerColumnStats::DebugString() const {
+  stringstream ss_low_value;
+  ss_low_value << low_value;
+  stringstream ss_high_value;
+  ss_high_value << high_value;
   return Substitute("ndv: $0, num_nulls: $1, max_width: $2, avg_width: $3, num_rows: "
-                    "$4, num_trues: $5, num_falses: $6",
-      ndv_estimate, num_nulls, max_width, avg_width, num_rows, num_trues, num_falses);
+                    "$4, num_trues: $5, num_falses: $6, low_value: $7, high_value: $8",
+      ndv_estimate, num_nulls, max_width, avg_width, num_rows, num_trues, num_falses,
+      ss_low_value.str(), ss_high_value.str());
 }
 
 namespace impala {
@@ -174,8 +223,8 @@ void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
   // The rowset should have the following schema: for every column in the source table,
   // seven columns are produced, one row per partition.
   // <ndv buckets>, <num nulls>, <max width>, <avg width>, <count rows>,
-  // <num trues>, <num falses>
-  static const int COLUMNS_PER_STAT = 7;
+  // <num trues>, <num falses>, <low value>, <high value>
+  static const int COLUMNS_PER_STAT = 9;
 
   const int num_cols =
       (col_stats_schema.columns.size() - num_partition_cols) / COLUMNS_PER_STAT;
@@ -208,13 +257,22 @@ void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
         int64_t num_nulls = col_stats_row.colVals[i + 1].i64Val.value;
         int64_t num_trues = col_stats_row.colVals[i + 5].i64Val.value;
         int64_t num_falses = col_stats_row.colVals[i + 6].i64Val.value;
+        TColumnValueHive low_value = col_stats_row.colVals[i + 7];
+        TColumnValueHive high_value = col_stats_row.colVals[i + 8];
+
+        impala::TColumnValue low_value_impala =
+            ConvertToTColumnValue(col_stats_schema.columns[i + 7], low_value);
+        impala::TColumnValue high_value_impala =
+            ConvertToTColumnValue(col_stats_schema.columns[i + 8], high_value);
 
         VLOG(3) << "Updated statistics for column=["
                 << col_stats_schema.columns[i].columnName << "]," << " statistics={"
                 << ndv << "," << num_rows << "," << avg_width << "," << num_trues
-                << "," << max_width << "," << num_nulls << "," << num_falses << "}";
-        stat->Update(
-            ndv, num_rows, avg_width, max_width, num_nulls, num_trues, num_falses);
+                << "," << max_width << "," << num_nulls << "," << num_falses
+                << PrintTColumnValue(low_value_impala) << ","
+                << PrintTColumnValue(high_value_impala) << "}";
+        stat->Update(ndv, num_rows, avg_width, max_width, num_nulls, num_trues,
+            num_falses, low_value_impala, high_value_impala);
 
         // Save the intermediate state per-column, per-partition
         TIntermediateColumnStats int_stats;
@@ -227,6 +285,8 @@ void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
         int_stats.__set_num_rows(num_rows);
         int_stats.__set_num_trues(num_trues);
         int_stats.__set_num_falses(num_falses);
+        int_stats.__set_low_value(low_value_impala);
+        int_stats.__set_high_value(high_value_impala);
 
         part_stat->intermediate_col_stats[col_stats_schema.columns[i].columnName] =
             int_stats;
@@ -283,10 +343,12 @@ void FinalizePartitionedColumnStats(const TTableSchema& col_stats_schema,
               << "statistics={" << int_stats.intermediate_ndv << ","
               << int_stats.num_rows << "," << int_stats.avg_width << ","
               << int_stats.max_width << ","<< int_stats.num_nulls << ","
-              << int_stats.num_trues << "," << int_stats.num_falses << "}";
+              << int_stats.num_trues << "," << int_stats.num_falses << ","
+              << int_stats.low_value << "," << int_stats.high_value << "}";
       stats[i].Update(DecodeNdv(int_stats.intermediate_ndv, int_stats.is_ndv_encoded),
           int_stats.num_rows, int_stats.avg_width, int_stats.max_width,
-          int_stats.num_nulls, int_stats.num_trues, int_stats.num_falses);
+          int_stats.num_nulls, int_stats.num_trues, int_stats.num_falses,
+          int_stats.low_value, int_stats.high_value);
     }
   }
 
diff --git a/be/src/exec/incr-stats-util.h b/be/src/exec/incr-stats-util.h
index 188b7f9..b7f8ced 100644
--- a/be/src/exec/incr-stats-util.h
+++ b/be/src/exec/incr-stats-util.h
@@ -59,6 +59,14 @@ struct PerColumnStats {
   // non-integer value)
   double avg_width;
 
+  // The low value which is undefined if all fields in TColumnValue are not defined.
+  // Otherwise, it is the one associated with the field defined (set).
+  TColumnValue low_value;
+
+  // The high value which is undefined if all fields in TColumnValue are not defined.
+  // Otherwise, it is the one associated with the field defined (set).
+  TColumnValue high_value;
+
   PerColumnStats()
     : intermediate_ndv(AggregateFunctions::DEFAULT_HLL_LEN, 0),
       num_nulls(0),
@@ -71,7 +79,8 @@ struct PerColumnStats {
   /// Updates all aggregate statistics with a new set of measurements.
   void Update(const string& ndv, int64_t num_new_rows, double new_avg_width,
       int32_t max_new_width, int64_t num_new_nulls, int64_t num_new_trues,
-      int64_t num_new_falses);
+      int64_t num_new_falses, const impala::TColumnValue& low_value,
+      const impala::TColumnValue& high_value);
 
   /// Performs any stats computations that are not distributive, that is they may not be
   /// computed in part during Update(). After this method returns, ndv_estimate and
@@ -83,6 +92,10 @@ struct PerColumnStats {
 
   /// Returns a string with debug information for this
   string DebugString() const;
+
+  /// Updates the low and the high value
+  void UpdateLowValue(const TColumnValue& value);
+  void UpdateHighValue(const TColumnValue& value);
 };
 
 namespace impala {
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 3fc7c79..ee50e7f 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -40,6 +40,7 @@
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/scoped-buffer.h"
+#include "service/hs2-util.h"
 #include "util/dict-encoding.h"
 #include "util/pretty-printer.h"
 #include "util/scope-exit-trigger.h"
@@ -94,6 +95,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     num_cols_counter_(nullptr),
     num_stats_filtered_row_groups_counter_(nullptr),
     num_minmax_filtered_row_groups_counter_(nullptr),
+    num_rowgroups_skipped_by_unuseful_filters_counter_(nullptr),
     num_row_groups_counter_(nullptr),
     num_minmax_filtered_pages_counter_(nullptr),
     num_scanners_with_no_reads_counter_(nullptr),
@@ -116,6 +118,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   num_minmax_filtered_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRuntimeFilteredRowGroups",
           TUnit::UNIT);
+  num_rowgroups_skipped_by_unuseful_filters_counter_ = ADD_COUNTER(
+      scan_node_->runtime_profile(), "NumRowGroupsSkippedByUnusefulFilters", TUnit::UNIT);
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
   num_row_groups_with_page_index_counter_ =
@@ -586,6 +590,36 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
   return Status::OK();
 }
 
+bool HdfsParquetScanner::FilterAlreadyDisabledOrOverlapWithColumnStats(
+    int filter_id, MinMaxFilter* minmax_filter, int idx, float threshold) {
+  const TRuntimeFilterDesc& filter_desc = filter_ctxs_[idx]->filter->filter_desc();
+  const TRuntimeFilterTargetDesc& target_desc = filter_desc.targets[0];
+
+  /// If the filter is always true, not enabled for row group, or covers too much area
+  /// with respect to column min and max stats, disable the use of the filter at all
+  /// levels and proceed to next predicate.
+  bool filterAlwaysTrue = false;
+  bool columnStatsRejected = false;
+  if ((filterAlwaysTrue = minmax_filter->AlwaysTrue())
+      || !filter_stats_[idx].enabled_for_rowgroup
+      || (columnStatsRejected = FilterContext::ShouldRejectFilterBasedOnColumnStats(
+              target_desc, minmax_filter, threshold))) {
+    filter_stats_[idx].enabled_for_rowgroup = false;
+    filter_stats_[idx].enabled_for_row = false;
+    filter_stats_[idx].enabled_for_page = false;
+    filter_ctxs_[idx]->stats->IncrCounters(FilterStats::ROW_GROUPS_KEY, 1, 1, 0);
+    VLOG(3) << "A filter is determined to be not useful:"
+            << " fid=" << filter_id
+            << ", enabled_for_rowgroup=" << (bool)filter_stats_[idx].enabled_for_rowgroup
+            << ", content=" << minmax_filter->DebugString()
+            << ", target column stats: low=" << PrintTColumnValue(target_desc.low_value)
+            << ", high=" << PrintTColumnValue(target_desc.high_value)
+            << ", threshold=" << threshold;
+    return true;
+  }
+  return false;
+}
+
 Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
     const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
     bool* skip_row_group) {
@@ -611,14 +645,30 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
 
   TMinmaxFilteringLevel::type level = state_->query_options().minmax_filtering_level;
   float threshold = (float)(state_->query_options().minmax_filter_threshold);
+  bool row_group_skipped_by_unuseful_filters = false;
 
   for (auto desc: GetOverlapPredicateDescs()) {
-
     int filter_id = desc.filter_id;
     int slot_idx = desc.slot_index;
-    MinMaxFilter* minmax_filter = FindMinMaxFilter(FindFilterIndex(filter_id));
+    /// Find the index of the filter that is common in data structure
+    /// filter_ctxs_ and filter_stats_.
+    int idx = FindFilterIndex(filter_id);
+    DCHECK(idx >= 0);
+    MinMaxFilter* minmax_filter = FindMinMaxFilter(idx);
 
-    if (!minmax_filter) continue;
+    if (!minmax_filter) {
+      // The filter is not available yet.
+      filter_ctxs_[idx]->stats->IncrCounters(FilterStats::ROW_GROUPS_KEY, 1, 0, 0);
+      continue;
+    }
+
+    if (HdfsParquetScanner::FilterAlreadyDisabledOrOverlapWithColumnStats(
+            filter_id, minmax_filter, idx, threshold)) {
+      // The filter is already disabled or too close to the column min/max stats, ignore
+      // it.
+      row_group_skipped_by_unuseful_filters = true;
+      continue;
+    }
 
     SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[slot_idx];
 
@@ -681,11 +731,6 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
             << RawValue::PrintValue(max_slot, col_type, col_type.scale)
             << ", content=" << minmax_filter->DebugString();
 
-    /// Find the index of the filter that is common in data structure
-    /// filter_ctxs_ and filter_stats_.
-    int idx = FindFilterIndex(filter_id);
-    DCHECK(idx >= 0);
-
     /// If not overlapping with this particular filter, the row group can be filtered
     /// out safely.
     if (overlap_ratio == 0.0) {
@@ -712,6 +757,10 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
     filter_ctxs_[idx]->stats->IncrCounters(FilterStats::ROW_GROUPS_KEY, 1, 1, 0);
   }
 
+  if (row_group_skipped_by_unuseful_filters) {
+    COUNTER_ADD(num_rowgroups_skipped_by_unuseful_filters_counter_, 1);
+  }
+
   return Status::OK();
 }
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 9f5cd69..4e04a8d 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -465,6 +465,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// and HJ min/max filters.
   RuntimeProfile::Counter* num_minmax_filtered_row_groups_counter_;
 
+  /// Number of row groups that are skipped by unuseful filters.
+  RuntimeProfile::Counter* num_rowgroups_skipped_by_unuseful_filters_counter_;
+
   /// Number of row groups that need to be read.
   RuntimeProfile::Counter* num_row_groups_counter_;
 
@@ -538,6 +541,17 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
     const parquet::FileMetaData& file_metadata, const parquet::RowGroup& row_group,
     bool* skip_row_group);
 
+  /// Return true if filter 'minmax_filter' of fitler id 'filter_id' is too close to
+  /// column min/max stats available at the target desc entry targets[0] in
+  /// 'filter_ctxs_[idx]', utilizing 'threshold' as the threshold. Return 'false'
+  /// otherwise.
+  ///
+  /// Side effect enabled_for_rowgroup, enabled_for_row and enabled_for_page in
+  /// filter_stats_[idx], FilterStats::ROW_GROUPS_KEY in filter_ctxs_[idx] and
+  /// num_column_stats_rejected_rowgroups_counter_ in this scanner.
+  bool FilterAlreadyDisabledOrOverlapWithColumnStats(
+      int filter_id, MinMaxFilter* minmax_filter, int idx, float threshold);
+
   /// Detect if a column is a collection or missing for a column chunk described by a
   /// schema path in a slot descriptor 'slot_desc'.
   /// On return:
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index ee21e7e..57fbeb6 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -35,6 +35,7 @@
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-filter.h"
 #include "runtime/runtime-state.h"
+#include "service/hs2-util.h"
 #include "util/bloom-filter.h"
 #include "util/cyclic-barrier.h"
 #include "util/debug-util.h"
@@ -924,14 +925,43 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) {
   VLOG(3) << "Join builder (join_node_id_=" << join_node_id_ << ") publishing "
           << filter_ctxs_.size() << " filters.";
   int32_t num_enabled_filters = 0;
+  float threshold = (float)(runtime_state_->query_options().minmax_filter_threshold);
   for (const FilterContext& ctx : filter_ctxs_) {
     BloomFilter* bloom_filter = nullptr;
     if (ctx.local_bloom_filter != nullptr) {
       bloom_filter = ctx.local_bloom_filter;
       ++num_enabled_filters;
-    } else if (ctx.local_min_max_filter != nullptr
-        && !ctx.local_min_max_filter->AlwaysTrue()) {
-      ++num_enabled_filters;
+    } else if (ctx.local_min_max_filter != nullptr) {
+      /// Apply the column min/max stats (if applicable) to shut down the min/max
+      /// filter early by setting always true flag for the filter. Do this only if
+      /// the min/max filter is too close in area to the column stats of all target
+      /// scan columns.
+      const TRuntimeFilterDesc& filter_desc = ctx.filter->filter_desc();
+      VLOG(3) << "Check out the usefulness of the local minmax filter:"
+              << " id=" << ctx.filter->id()
+              << ", details=" << ctx.local_min_max_filter->DebugString()
+              << ", column stats:"
+              << " low=" << PrintTColumnValue(filter_desc.targets[0].low_value)
+              << ", high=" << PrintTColumnValue(filter_desc.targets[0].high_value)
+              << ", threshold=" << threshold
+              << ", #targets=" << filter_desc.targets.size();
+      bool all_overlap = true;
+      for (auto target_desc : filter_desc.targets) {
+        if (!FilterContext::ShouldRejectFilterBasedOnColumnStats(
+                target_desc, ctx.local_min_max_filter, threshold)) {
+          all_overlap = false;
+          break;
+        }
+      }
+      if (all_overlap) {
+        ctx.local_min_max_filter->SetAlwaysTrue();
+        VLOG(3) << "The local minmax filter is set to always true:"
+                << " id=" << ctx.filter->id();
+      }
+
+      if (!ctx.local_min_max_filter->AlwaysTrue()) {
+        ++num_enabled_filters;
+      }
     }
 
     runtime_state_->filter_bank()->UpdateFilterFromLocal(
diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc
index 01c7e2e..1870b34 100644
--- a/be/src/service/hs2-util.cc
+++ b/be/src/service/hs2-util.cc
@@ -19,6 +19,7 @@
 
 #include "common/logging.h"
 #include "exprs/scalar-expr-evaluator.h"
+#include "runtime/date-value.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
@@ -601,8 +602,7 @@ void PrintVal(const apache::hive::service::cli::thrift::TByteValue& val, ostream
   }
 }
 
-void impala::PrintTColumnValue(
-    const apache::hive::service::cli::thrift::TColumnValue& colval, stringstream* out) {
+void impala::PrintTColumnValue(const thrift::TColumnValue& colval, stringstream* out) {
   if (colval.__isset.boolVal) {
     if (colval.boolVal.__isset.value) {
       (*out) << ((colval.boolVal.value) ? "true" : "false");
@@ -625,3 +625,113 @@ void impala::PrintTColumnValue(
     (*out) << "NULL";
   }
 }
+
+TColumnValue impala::ConvertToTColumnValue(
+    const thrift::TColumnDesc& desc, const thrift::TColumnValue& hive_colval) {
+  // By default, all values in Impala TColumnValue are unset. To set a value,
+  // it must be present in a particular field in the Hive version and not null.
+  TColumnValue colval;
+  if (hive_colval.__isset.boolVal && hive_colval.boolVal.__isset.value) {
+    colval.__set_bool_val(hive_colval.boolVal.value);
+  } else if (hive_colval.__isset.doubleVal && hive_colval.doubleVal.__isset.value) {
+    colval.__set_double_val(hive_colval.doubleVal.value);
+  } else if (hive_colval.__isset.byteVal && hive_colval.byteVal.__isset.value) {
+    colval.__set_byte_val(hive_colval.byteVal.value);
+  } else if (hive_colval.__isset.i32Val && hive_colval.i32Val.__isset.value) {
+    colval.__set_int_val(hive_colval.i32Val.value);
+  } else if (hive_colval.__isset.i16Val && hive_colval.i16Val.__isset.value) {
+    colval.__set_short_val(hive_colval.i16Val.value);
+  } else if (hive_colval.__isset.i64Val && hive_colval.i64Val.__isset.value) {
+    colval.__set_long_val(hive_colval.i64Val.value);
+  } else if (hive_colval.__isset.stringVal && hive_colval.stringVal.__isset.value) {
+    switch (desc.typeDesc.types[0].primitiveEntry.type) {
+      // For Hive date type, the value is represented as a string, such as '2020-01-01'.
+      // Convert the string to Epoch days.
+      case thrift::TTypeId::DATE_TYPE:
+        {
+          DateValue d =
+              DateValue::ParseSimpleDateFormat(hive_colval.stringVal.value, false);
+          colval.__set_date_val(d.Value());
+        }
+        break;
+      // For Hive decimal type, the value is represented as a string, such as '1.234567'.
+      // Its precision and scale is contained in desc as type qualifiers.
+      case thrift::TTypeId::DECIMAL_TYPE:
+        {
+          const std::map<std::string, thrift::TTypeQualifierValue>& map =
+              desc.typeDesc.types[0].primitiveEntry.typeQualifiers.qualifiers;
+          auto it = map.find("precision");
+          if (it == map.end()) {
+            DCHECK(false) << "Unable to find precision";
+          }
+          int precision = it->second.i32Value;
+
+          it = map.find("scale");
+          if (it == map.end()) {
+            DCHECK(false) << "Unable to find scale";
+          }
+          int scale = it->second.i32Value;
+
+          VLOG(3) << "Decimal in hive_colval: value=" << hive_colval.stringVal.value
+                  << ", precision=" << precision
+                  << ", scale=" << scale;
+
+          colval.__set_decimal_val(hive_colval.stringVal.value);
+        }
+        break;
+      case thrift::TTypeId::STRING_TYPE:
+        colval.__set_string_val(hive_colval.stringVal.value);
+        break;
+      default:
+        DCHECK(false) << "Unsupported conversion for hive type "
+                      << desc.typeDesc.types[0];
+    }
+  }
+  return colval;
+}
+
+void impala::PrintTColumnValue(const impala::TColumnValue& value, stringstream* out) {
+  if (value.__isset.bool_val) {
+    *out << value.bool_val;
+  } else if (value.__isset.double_val) {
+    *out << value.double_val;
+  } else if (value.__isset.byte_val) {
+    *out << value.byte_val;
+  } else if (value.__isset.int_val) {
+    *out << value.int_val;
+  } else if (value.__isset.short_val) {
+    *out << value.short_val;
+  } else if (value.__isset.long_val) {
+    *out << value.long_val;
+  } else if (value.__isset.string_val) {
+    *out << value.string_val;
+  } else if (value.__isset.binary_val) {
+    *out << value.binary_val;
+  } else if (value.__isset.timestamp_val) {
+    *out << value.timestamp_val;
+  } else if (value.__isset.decimal_val) {
+    *out << value.decimal_val;
+  } else if (value.__isset.date_val) {
+    *out << value.date_val;
+  }
+}
+
+string impala::PrintTColumnValue(const impala::TColumnValue& value) {
+  std::stringstream ss;
+  PrintTColumnValue(value, &ss);
+  return ss.str();
+}
+
+bool impala::isOneFieldSet(const impala::TColumnValue& value) {
+  return (value.__isset.bool_val ||
+          value.__isset.double_val ||
+          value.__isset.byte_val ||
+          value.__isset.int_val ||
+          value.__isset.short_val ||
+          value.__isset.long_val ||
+          value.__isset.string_val ||
+          value.__isset.binary_val ||
+          value.__isset.timestamp_val ||
+          value.__isset.decimal_val ||
+          value.__isset.date_val);
+}
diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h
index 4f0f973..083059c 100644
--- a/be/src/service/hs2-util.h
+++ b/be/src/service/hs2-util.h
@@ -17,12 +17,15 @@
 
 #include "gen-cpp/ImpalaHiveServer2Service.h"
 #include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/TCLIService_types.h"
 
 namespace impala {
 
 class RowBatch;
 class ScalarExprEvaluator;
 
+typedef apache::hive::service::cli::thrift::TColumnValue TColumnValueHive;
+
 /// Utility methods for converting from Impala (either an Expr result or a TColumnValue)
 /// to Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->).
 
@@ -54,4 +57,15 @@ void StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added, uint32_t sta
 void PrintTColumnValue(const apache::hive::service::cli::thrift::TColumnValue& colval,
     std::stringstream* out);
 
+/// Utility method for converting from Hive TColumnValue to Impala TColumnValue.
+TColumnValue ConvertToTColumnValue(
+    const apache::hive::service::cli::thrift::TColumnDesc& desc,
+    const apache::hive::service::cli::thrift::TColumnValue& hive_colval);
+
+/// Utility method for printing Impala TColumnValue.
+void PrintTColumnValue(const impala::TColumnValue& colval, std::stringstream* out);
+std::string PrintTColumnValue(const impala::TColumnValue& colval);
+
+/// Return true if one field in value is set. Return false otherwise.
+bool isOneFieldSet(const impala::TColumnValue& value);
 }
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 9960fd3..35b0610 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -601,6 +601,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_compute_stats_min_sample_size(min_sample_size);
         break;
       }
+      case TImpalaQueryOptions::COMPUTE_COLUMN_MINMAX_STATS: {
+        query_options->__set_compute_column_minmax_stats(IsTrue(value));
+        break;
+      }
+      case TImpalaQueryOptions::SHOW_COLUMN_MINMAX_STATS: {
+        query_options->__set_show_column_minmax_stats(IsTrue(value));
+        break;
+      }
       case TImpalaQueryOptions::EXEC_TIME_LIMIT_S: {
         StringParser::ParseResult result;
         const int32_t time_limit =
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index cc794e4..c4cda49 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MINMAX_FILTERING_LEVEL + 1);\
+      TImpalaQueryOptions::SHOW_COLUMN_MINMAX_STATS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -238,6 +238,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(minmax_filtering_level, MINMAX_FILTERING_LEVEL,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(compute_column_minmax_stats, COMPUTE_COLUMN_MINMAX_STATS,\
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index 1e32d02..df4e82d 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -185,6 +185,13 @@ TEST(MinMaxFilterTest, TestNumericMinMaxFilter) {
   EXPECT_FALSE(f1->AlwaysTrue());
   EXPECT_FALSE(f1->AlwaysFalse());
 
+  MinMaxFilterPB pFilter3;
+  pFilter3.mutable_min()->set_int_val(0);
+  pFilter3.mutable_max()->set_int_val(0);
+  pFilter2.set_always_true(true);
+  MinMaxFilter::Or(pFilter2, &pFilter3, int_type);
+  EXPECT_EQ(pFilter3.always_true(), true);
+
   int_filter->Close();
   empty_filter->Close();
   int_filter2->Close();
@@ -437,6 +444,16 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   EXPECT_TRUE(always_true_filter->AlwaysTrue());
   EXPECT_FALSE(always_true_filter->AlwaysFalse());
 
+  // Test the transfer of always true flag from operation Or() to protobuf and back
+  MinMaxFilter* f3 = MinMaxFilter::Create(string_type, &obj_pool, &mem_tracker);
+  f3->SetAlwaysTrue();
+  f1->Or(*f3);
+  EXPECT_TRUE(f3->AlwaysTrue());
+  f3->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_true());
+  StringMinMaxFilter f4(pFilter, &mem_tracker);
+  EXPECT_TRUE(f4.AlwaysTrue());
+
   filter->Close();
   empty_filter->Close();
   filter2->Close();
@@ -445,6 +462,7 @@ TEST(MinMaxFilterTest, TestStringMinMaxFilter) {
   f1->Close();
   f2->Close();
   always_false->Close();
+  f3->Close();
 }
 
 static TimestampValue ParseSimpleTimestamp(const char* s) {
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index 78fa51d..4250939 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -105,10 +105,11 @@ int64_t GetIntTypeValue(const ColumnType& type, const void* value) {
   const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                                    \
       "class.impala::" #NAME "MinMaxFilter";                                           \
   NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) {             \
-    DCHECK(!protobuf.always_true());                                                   \
     if (protobuf.always_false()) {                                                     \
       min_ = numeric_limits<TYPE>::max();                                              \
       max_ = numeric_limits<TYPE>::lowest();                                           \
+    } else if (protobuf.always_true()) {                                               \
+      always_true_ = true;                                                             \
     } else {                                                                           \
       DCHECK(protobuf.has_min());                                                      \
       DCHECK(protobuf.has_max());                                                      \
@@ -122,17 +123,17 @@ int64_t GetIntTypeValue(const ColumnType& type, const void* value) {
     return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                                       \
   }                                                                                    \
   void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {                \
-    if (!AlwaysFalse()) {                                                              \
+    if (!AlwaysFalse() && !AlwaysTrue()) {                                             \
       protobuf->mutable_min()->set_##PROTOBUF_TYPE##_val(min_);                        \
       protobuf->mutable_max()->set_##PROTOBUF_TYPE##_val(max_);                        \
     }                                                                                  \
     protobuf->set_always_false(AlwaysFalse());                                         \
-    protobuf->set_always_true(false);                                                  \
+    protobuf->set_always_true(AlwaysTrue());                                           \
   }                                                                                    \
   string NAME##MinMaxFilter::DebugString() const {                                     \
     stringstream out;                                                                  \
     out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_                    \
-        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")"              \
+        << ", always_false=" << (AlwaysFalse() ? "true" : "false")                     \
         << ", always_true=" << (AlwaysTrue() ? "true" : "false") << ")";               \
     return out.str();                                                                  \
   }                                                                                    \
@@ -141,6 +142,8 @@ int64_t GetIntTypeValue(const ColumnType& type, const void* value) {
       out->mutable_min()->set_bool_val(in.min().PROTOBUF_TYPE##_val());                \
       out->mutable_max()->set_bool_val(in.max().PROTOBUF_TYPE##_val());                \
       out->set_always_false(false);                                                    \
+    } else if (in.always_true() || out->always_true()) {                               \
+      out->set_always_true(true);                                                      \
     } else {                                                                           \
       out->mutable_min()->set_##PROTOBUF_TYPE##_val(                                   \
           std::min(in.min().PROTOBUF_TYPE##_val(), out->min().PROTOBUF_TYPE##_val())); \
@@ -151,7 +154,7 @@ int64_t GetIntTypeValue(const ColumnType& type, const void* value) {
   void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {       \
     out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val());     \
     out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val());     \
-  }                                                                                    \
+  }
 
 NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
 NUMERIC_MIN_MAX_FILTER_FUNCS(TinyInt, int8_t, byte, TINYINT);
@@ -408,14 +411,18 @@ void StringMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
     out->mutable_max()->set_string_val(in.max().string_val());
     out->set_always_false(false);
   } else {
-    StringValue in_min_val = StringValue(in.min().string_val());
-    StringValue out_min_val = StringValue(out->min().string_val());
-    if (in_min_val < out_min_val)
-      out->mutable_min()->set_string_val(in.min().string_val());
-    StringValue in_max_val = StringValue(in.max().string_val());
-    StringValue out_max_val = StringValue(out->max().string_val());
-    if (in_max_val > out_max_val)
-      out->mutable_max()->set_string_val(in.max().string_val());
+    if (in.always_true() || out->always_true()) {
+      out->set_always_true(true);
+    } else {
+      StringValue in_min_val = StringValue(in.min().string_val());
+      StringValue out_min_val = StringValue(out->min().string_val());
+      if (in_min_val < out_min_val)
+        out->mutable_min()->set_string_val(in.min().string_val());
+      StringValue in_max_val = StringValue(in.max().string_val());
+      StringValue out_max_val = StringValue(out->max().string_val());
+      if (in_max_val > out_max_val)
+        out->mutable_max()->set_string_val(in.max().string_val());
+    }
   }
 }
 
@@ -476,7 +483,8 @@ float StringMinMaxFilter::ComputeOverlapRatio(
       "class.impala::" #NAME "MinMaxFilter";                                           \
   NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) {             \
     always_false_ = protobuf.always_false();                                           \
-    if (!always_false_) {                                                              \
+    always_true_ = protobuf.always_true();                                             \
+    if (!always_false_ && !always_true_) {                                             \
       DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val());                              \
       DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val());                              \
       min_ = TYPE::FromColumnValuePB(protobuf.min());                                  \
@@ -487,17 +495,18 @@ float StringMinMaxFilter::ComputeOverlapRatio(
     return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                                       \
   }                                                                                    \
   void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {                \
-    if (!always_false_) {                                                              \
+    if (!always_false_ && !always_true_) {                                             \
       min_.ToColumnValuePB(protobuf->mutable_min());                                   \
       max_.ToColumnValuePB(protobuf->mutable_max());                                   \
     }                                                                                  \
     protobuf->set_always_false(always_false_);                                         \
-    protobuf->set_always_true(false);                                                  \
+    protobuf->set_always_true(always_true_);                                           \
   }                                                                                    \
   string NAME##MinMaxFilter::DebugString() const {                                     \
     stringstream out;                                                                  \
     out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_                    \
-        << ", always_false=" << (always_false_ ? "true" : "false") << ")";             \
+        << ", always_false=" << (always_false_ ? "true" : "false")                     \
+        << ", always_true=" << (always_false_ ? "true" : "false") << ")";              \
     return out.str();                                                                  \
   }                                                                                    \
   void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {         \
@@ -505,6 +514,8 @@ float StringMinMaxFilter::ComputeOverlapRatio(
       out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val());   \
       out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val());   \
       out->set_always_false(false);                                                    \
+    } else if (in.always_true() || out->always_true()) {                               \
+      out->set_always_true(true);                                                      \
     } else {                                                                           \
       TYPE in_min_val = TYPE::FromColumnValuePB(in.min());                             \
       TYPE out_min_val = TYPE::FromColumnValuePB(out->min());                          \
@@ -586,7 +597,8 @@ const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxF
 DecimalMinMaxFilter::DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision)
   : size_(ColumnType::GetDecimalByteSize(precision)),
     always_false_(protobuf.always_false()) {
-  if (!always_false_) {
+  always_true_ = protobuf.always_true();
+  if (!always_false_ && !always_true_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
         DECIMAL_SET_MINMAX(4);
@@ -616,7 +628,7 @@ PrimitiveType DecimalMinMaxFilter::type() {
 // Construct a thrift min-max filter.  Will be called by the executor
 // to be sent to the coordinator
 void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
-  if (!always_false_) {
+  if (!always_false_ && !always_true_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
         DECIMAL_TO_PROTOBUF(4);
@@ -632,7 +644,7 @@ void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
     }
   }
   protobuf->set_always_false(always_false_);
-  protobuf->set_always_true(false);
+  protobuf->set_always_true(always_true_);
 }
 
 void DecimalMinMaxFilter::Insert(const void* val) {
@@ -652,10 +664,11 @@ void DecimalMinMaxFilter::Insert(const void* val) {
   }
 }
 
-#define DECIMAL_DEBUG_STRING(SIZE)                                                \
-  do {                                                                            \
-    out << "DecimalMinMaxFilter(min=" << min##SIZE##_ << ", max=" << max##SIZE##_ \
-        << " always_false=" << (always_false_ ? "true" : "false") << ")";         \
+#define DECIMAL_DEBUG_STRING(SIZE)                                                 \
+  do {                                                                             \
+    out << "DecimalMinMaxFilter(min=" << min##SIZE##_ << ", max=" << max##SIZE##_  \
+        << ", always_false=" << (always_false_ ? "true" : "false")                 \
+        << ", always_true=" << (always_false_ ? "true" : "false") << ")";          \
   } while (false)
 
 string DecimalMinMaxFilter::DebugString() const {
@@ -696,6 +709,8 @@ void DecimalMinMaxFilter::Or(
     out->mutable_min()->set_decimal_val(in.min().decimal_val());
     out->mutable_max()->set_decimal_val(in.max().decimal_val());
     out->set_always_false(false);
+  } else if (in.always_true() || out->always_true()) {
+    out->set_always_true(true);
   } else {
     int size = ColumnType::GetDecimalByteSize(precision);
     switch (size) {
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 9489079..f434fff 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -120,10 +120,10 @@ class MinMaxFilter {
   virtual float ComputeOverlapRatio(
       const ColumnType& type, void* data_min, void* data_max) = 0;
 
- protected:
   /// Makes this filter always return true.
   virtual void SetAlwaysTrue() { always_true_ = true; }
 
+ protected:
   bool always_true_;
 };
 
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index e902a55..90d5a7b 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -18,6 +18,7 @@
 namespace cpp impala
 namespace java org.apache.impala.thrift
 
+include "Data.thrift"
 include "Exprs.thrift"
 include "Status.thrift"
 include "Types.thrift"
@@ -199,6 +200,10 @@ struct TColumnStats {
   // Estimated number of true and false value for boolean type
   5: required i64 num_trues
   6: required i64 num_falses
+
+  // The low and the high value
+  7: optional Data.TColumnValue low_value
+  8: optional Data.TColumnValue high_value
 }
 
 // Intermediate state for the computation of per-column stats. Impala can aggregate these
@@ -225,6 +230,10 @@ struct TIntermediateColumnStats {
   // The number of true and false value, of the column
   7: required i64 num_trues
   8: required i64 num_falses
+
+  // The low and the high value
+  9: optional Data.TColumnValue low_value
+  10: optional Data.TColumnValue high_value
 }
 
 // Per-partition statistics
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 62899bf..ff7d936 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -220,6 +220,7 @@ enum TShowStatsOp {
 struct TShowStatsParams {
   1: TShowStatsOp op
   2: CatalogObjects.TTableName table_name
+  3: optional bool show_column_minmax_stats
 }
 
 // Parameters for DESCRIBE HISTORY command
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 80eb80e..c8c10e3 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -645,6 +645,12 @@ enum TImpalaQueryOptions {
   //     PAGE      - apply to row groups and pages only.
   //     ROW       - apply to row groups, pages and rows.
   MINMAX_FILTERING_LEVEL = 123
+
+  // If true, compute the column min and max stats during compute stats.
+  COMPUTE_COLUMN_MINMAX_STATS = 124
+
+  // If true, show the min and max stats during show column stats.
+  SHOW_COLUMN_MINMAX_STATS = 125
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 8daad7a..44d242a 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -25,6 +25,7 @@ namespace cpp impala
 namespace java org.apache.impala.thrift
 
 include "CatalogObjects.thrift"
+include "Data.thrift"
 include "ExecStats.thrift"
 include "Exprs.thrift"
 include "Types.thrift"
@@ -112,6 +113,10 @@ struct TRuntimeFilterTargetDesc {
   // type of the targeted column.
   6: optional string kudu_col_name
   7: optional Types.TColumnType kudu_col_type;
+
+  // The low and high value as seen in the column stats of the targeted column.
+  8: optional Data.TColumnValue low_value
+  9: optional Data.TColumnValue high_value
 }
 
 enum TRuntimeFilterType {
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index ce0d654..d2d8e7d 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -478,6 +478,12 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   124: optional PlanNodes.TMinmaxFilteringLevel minmax_filtering_level =
       PlanNodes.TMinmaxFilteringLevel.ROW_GROUP;
+
+  // See comment in ImpalaService.thrift
+  125: optional bool compute_column_minmax_stats = false;
+
+  // See comment in ImpalaService.thrift
+  126: optional bool show_column_minmax_stats = false;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index db3dd2d..6c69bb5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -55,6 +55,7 @@ import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUnit;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Joiner;
@@ -245,6 +246,13 @@ public class ComputeStatsStmt extends StatementBase {
     int startColIdx = (table_ instanceof FeHBaseTable) ? 0 :
         table_.getNumClusteringCols();
 
+    // Verify that the table is Parquet.
+    boolean computeMinMax = analyzer.getQueryCtx()
+                                .getClient_request()
+                                .getQuery_options()
+                                .isCompute_column_minmax_stats()
+        && hasAtLeastOneParquetPartition();
+
     for (int i = startColIdx; i < table_.getColumns().size(); ++i) {
       Column c = table_.getColumns().get(i);
       if (validatedColumnWhitelist_ != null && !validatedColumnWhitelist_.contains(c)) {
@@ -298,6 +306,24 @@ public class ComputeStatsStmt extends StatementBase {
         columnStatsSelectList.add("NULL");
         columnStatsSelectList.add("NULL");
       }
+
+      // Finally, compute the min and max. NULLs in the column are ignored unless
+      // all values are NULL in which case a NULL value will be produced.
+      //
+      // Do it only for INTEGERS, DOUBLES, DECIMAL and DATE types as they can be
+      // stored in LongColumnStatsData, DoubleColumnStatsData,
+      // DecimalColumnStatsData or DateColumnStatsData in HMS.
+      String min_expr = null;
+      String max_expr = null;
+      if (computeMinMax && MetaStoreUtil.canStoreMinmaxInHMS(type)) {
+        min_expr = "MIN(" + colRefSql + ")";
+        max_expr = "MAX(" + colRefSql + ")";
+      } else  {
+        min_expr = "NULL";
+        max_expr = "NULL";
+      }
+      columnStatsSelectList.add(min_expr);
+      columnStatsSelectList.add(max_expr);
     }
     return columnStatsSelectList;
   }
@@ -925,6 +951,33 @@ public class ComputeStatsStmt extends StatementBase {
     return true;
   }
 
+  /**
+   * Returns true if this statement computes stats on a table with at least one Parquet
+   * partition, false otherwise.
+   */
+  public boolean hasAtLeastOneParquetPartition() {
+    if (!(table_ instanceof FeFsTable)) return false;
+    FeFsTable hdfsTable = (FeFsTable) table_;
+    Set<Long> partitionIds = hdfsTable.getPartitionIds();
+    if (partitionIds.size() > 0) {
+      for (Long partitionId : partitionIds) {
+        FeFsPartition partition = FeCatalogUtils.loadPartition(hdfsTable, partitionId);
+        if (partition.getFileFormat().isParquetBased()) {
+          return true;
+        }
+      }
+    } else {
+      Collection<? extends FeFsPartition> allPartitions =
+          FeCatalogUtils.loadAllPartitions(hdfsTable);
+      for (FeFsPartition partition : allPartitions) {
+        if (partition.getFileFormat().isParquetBased()) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   @Override
   public String toSql(ToSqlOptions options) {
     if (!isIncremental_) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
index 2a798fb..34d7f49 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
@@ -38,6 +38,7 @@ import com.google.common.base.Preconditions;
 public class ShowStatsStmt extends StatementBase {
   protected final TShowStatsOp op_;
   protected final TableName tableName_;
+  protected boolean show_column_minmax_stats_ = false;
 
   // Set during analysis.
   protected FeTable table_;
@@ -124,11 +125,15 @@ public class ShowStatsStmt extends StatementBase {
             " must target an HDFS or Kudu table: " + table_.getFullName());
       }
     }
+    show_column_minmax_stats_ =
+        analyzer.getQueryOptions().isShow_column_minmax_stats();
   }
 
   public TShowStatsParams toThrift() {
     // Ensure the DB is set in the table_name field by using table and not tableName.
-    return new TShowStatsParams(op_,
+    TShowStatsParams showStatsParam = new TShowStatsParams(op_,
         new TableName(table_.getDb().getName(), table_.getName()).toThrift());
+    showStatsParam.setShow_column_minmax_stats(show_column_minmax_stats_);
+    return showStatsParam;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index da2a75b..fcb52f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -17,19 +17,32 @@
 
 package org.apache.impala.catalog;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.util.Arrays;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.Date;
 import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.DateLiteral;
+import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.thrift.TColumnStats;
+import org.apache.impala.thrift.TColumnValue;
+
+import org.apache.log4j.Logger;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -48,6 +61,8 @@ public class ColumnStats {
       PrimitiveType.VARCHAR, PrimitiveType.STRING, PrimitiveType.TIMESTAMP,
       PrimitiveType.TINYINT, PrimitiveType.DECIMAL);
 
+  private static final Logger LOG = Logger.getLogger(ColumnStats.class);
+
   public enum StatsKey {
     NUM_DISTINCT_VALUES("numDVs"),
     NUM_NULLS("numNulls"),
@@ -87,6 +102,8 @@ public class ColumnStats {
   private long numNulls_;
   private long numTrues_;
   private long numFalses_;
+  private TColumnValue lowValue_;
+  private TColumnValue highValue_;
 
   public ColumnStats(Type colType) {
     initColStats(colType);
@@ -104,6 +121,8 @@ public class ColumnStats {
     numNulls_ = other.numNulls_;
     numTrues_ = other.numTrues_;
     numFalses_ = other.numFalses_;
+    lowValue_ = other.lowValue_;
+    highValue_ = other.highValue_;
     validate(null);
   }
 
@@ -120,6 +139,8 @@ public class ColumnStats {
     numNulls_ = -1;
     numTrues_ = -1;
     numFalses_ = -1;
+    lowValue_ = null;
+    highValue_ = null;
     if (colType.isFixedLengthType()) {
       avgSerializedSize_ = colType.getSlotSize();
       avgSize_ = colType.getSlotSize();
@@ -149,6 +170,8 @@ public class ColumnStats {
     }
     stats.numTrues_ = slotStats.getNumTrues();
     stats.numFalses_ = slotStats.getNumFalses();
+    stats.lowValue_ = slotStats.getLowValue();
+    stats.highValue_ = slotStats.getHighValue();
     stats.validate(colType);
     return stats;
   }
@@ -202,6 +225,306 @@ public class ColumnStats {
   public long getNumFalses() { return numFalses_; }
   public boolean hasNumDistinctValues() { return numDistinctValues_ >= 0; }
   public boolean hasStats() { return numNulls_ != -1 || numDistinctValues_ != -1; }
+  public TColumnValue getLowValue() { return lowValue_; }
+  public TColumnValue getHighValue() { return highValue_; }
+
+  /**
+   * Return the value of a defined field as a string. Return -1 if 'value' is null,
+   * all fields are undefined, or a field is defined but its value is null.
+   */
+  public String getTColumnValueAsString(TColumnValue value) {
+    if (value==null) return "-1";
+    StringBuilder sb = new StringBuilder("");
+
+    if (value.isSetBool_val()) {
+      sb.append(value.bool_val);
+    } else if (value.isSetByte_val()) {
+      sb.append(value.byte_val);
+    } else if (value.isSetShort_val()) {
+      sb.append(value.short_val);
+    } else if (value.isSetInt_val()) {
+      sb.append(value.int_val);
+    } else if (value.isSetLong_val()) {
+      sb.append(value.long_val);
+    } else if (value.isSetDouble_val()) {
+      sb.append(value.double_val);
+    } else if (value.isSetString_val()) {
+      if (value.string_val == null) {
+        sb.append("-1");
+      } else {
+        sb.append(value.string_val);
+      }
+    } else if (value.isSetBinary_val()) {
+      if (value.binary_val == null) {
+        sb.append("-1");
+      } else {
+        org.apache.thrift.TBaseHelper.toString(value.binary_val, sb);
+      }
+    } else if (value.isSetTimestamp_val()) {
+      if (value.timestamp_val == null) {
+        sb.append("-1");
+      } else {
+        org.apache.thrift.TBaseHelper.toString(value.timestamp_val, sb);
+      }
+    } else if (value.isSetDecimal_val()) {
+      if (value.decimal_val == null) {
+        sb.append("-1");
+      } else {
+        sb.append(new String(value.getDecimal_val()));
+      }
+    } else if (value.isSetDate_val()) {
+      LocalDate d = LocalDate.ofEpochDay(value.date_val);
+      sb.append(d.toString());
+    } else {
+      sb.append("-1");
+    }
+    return sb.toString();
+  }
+
+  /*
+   * Return the low and high value as a string.
+   */
+  public String getLowValueAsString() { return getTColumnValueAsString(lowValue_); }
+  public String getHighValueAsString() { return getTColumnValueAsString(highValue_); }
+
+  /*
+   * Update the low value with a numeric literal
+   */
+  protected void updateLowValue(NumericLiteral literal) {
+    if (lowValue_ == null) lowValue_ = new TColumnValue();
+    if (literal.getType().isScalarType(PrimitiveType.TINYINT)) {
+      int value = literal.getIntValue();
+      if (!lowValue_.isSetByte_val() || value < lowValue_.getByte_val()) {
+        lowValue_.setByte_val((byte) value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.SMALLINT)) {
+      int value = literal.getIntValue();
+      if (!lowValue_.isSetShort_val() || value < lowValue_.getShort_val()) {
+        lowValue_.setShort_val((short) value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.INT)) {
+      int value = literal.getIntValue();
+      if (!lowValue_.isSetInt_val() || value < lowValue_.getInt_val()) {
+        lowValue_.setInt_val(value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.BIGINT)) {
+      long value = literal.getLongValue();
+      if (!lowValue_.isSetLong_val() || value < lowValue_.getLong_val()) {
+        lowValue_.setLong_val(value);
+      }
+    } else if (literal.getType().isFloatingPointType()) {
+      double value = literal.getDoubleValue();
+      if (!lowValue_.isSetDouble_val() || value < lowValue_.getDouble_val()) {
+        lowValue_.setDouble_val(value);
+      }
+    } else if (literal.getType().isDecimal()) {
+      // Decimals are represented as ASCII strings in bytes[] in lowValue_.
+      if (!lowValue_.isSetDecimal_val() ) {
+        lowValue_.setDecimal_val(literal.getValue().toString().getBytes());
+      } else {
+        BigDecimal value = literal.getValue();
+        BigDecimal lValue = new BigDecimal(new String(lowValue_.getDecimal_val()));
+        if (value.compareTo(lValue) == -1) {
+          lowValue_.setDecimal_val(value.toString().getBytes());
+        }
+      }
+    }
+  }
+
+  /*
+   * Update the high value with a numeric literal
+   */
+  protected void updateHighValue(NumericLiteral literal) {
+    if (highValue_ == null) highValue_ = new TColumnValue();
+    if (literal.getType().isScalarType(PrimitiveType.TINYINT)) {
+      int value = literal.getIntValue();
+      if (!highValue_.isSetByte_val() || value > highValue_.getByte_val()) {
+        highValue_.setByte_val((byte) value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.SMALLINT)) {
+      int value = literal.getIntValue();
+      if (!highValue_.isSetShort_val() || value > highValue_.getShort_val()) {
+        highValue_.setShort_val((short) value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.INT)) {
+      int value = literal.getIntValue();
+      if (!highValue_.isSetInt_val() || value > highValue_.getInt_val()) {
+        highValue_.setInt_val(value);
+      }
+    } else if (literal.getType().isScalarType(PrimitiveType.BIGINT)) {
+      long value = literal.getLongValue();
+      if (!highValue_.isSetLong_val() || value > highValue_.getLong_val()) {
+        highValue_.setLong_val(value);
+      }
+    } else if (literal.getType().isFloatingPointType()) {
+      double value = literal.getDoubleValue();
+      if (!highValue_.isSetDouble_val() || value > highValue_.getDouble_val()) {
+        highValue_.setDouble_val(value);
+      }
+    } else if (literal.getType().isDecimal()) {
+      // Decimals are represented as ASCII strings in bytes[] in highValue_.
+      if (!highValue_.isSetDecimal_val() ) {
+        highValue_.setDecimal_val(literal.getValue().toString().getBytes());
+      } else {
+        BigDecimal value = literal.getValue();
+        BigDecimal hValue = new BigDecimal(new String(highValue_.getDecimal_val()));
+        if (value.compareTo(hValue) == 1) {
+          highValue_.setDecimal_val(value.toString().getBytes());
+        }
+      }
+    }
+  }
+
+  /*
+   * Update the low value with a date literal
+   */
+  protected void updateLowValue(DateLiteral literal) {
+    if (lowValue_ == null) lowValue_ = new TColumnValue();
+    int value = literal.getValue();
+    if (!lowValue_.isSetDate_val() || value < lowValue_.getDate_val()) {
+      lowValue_.setDate_val(value);
+    }
+  }
+
+  /*
+   * Update the high value with a date literal
+   */
+  protected void updateHighValue(DateLiteral literal) {
+    if (highValue_ == null) highValue_ = new TColumnValue();
+    int value = literal.getValue();
+    if (!highValue_.isSetDate_val() || value > highValue_.getDate_val()) {
+      highValue_.setDate_val(value);
+    }
+  }
+
+  /*
+   * Update the low and the high value with 'literal'. If 'literal' is NULL or not a type
+   * supported by HMS for storage, no update will be done. This method is mainly called
+   * to update the low and high value for partition columns in HDFS table.
+   * TODO: handle DECIMAL.
+   */
+  public void updateLowAndHighValue(LiteralExpr literal) {
+    if (Expr.IS_NULL_LITERAL.apply(literal)) return;
+    if (!MetaStoreUtil.canStoreMinmaxInHMS(literal.getType())) return;
+    if (literal instanceof NumericLiteral) {
+      updateLowValue((NumericLiteral) literal);
+      updateHighValue((NumericLiteral) literal);
+    } else if (literal instanceof DateLiteral) {
+      updateLowValue((DateLiteral) literal);
+      updateHighValue((DateLiteral) literal);
+    }
+  }
+
+  /*
+   * From the source 'longStats', set the low and high value for 'type' (one of the
+   * integer types).
+   */
+  protected void setLowAndHighValue(PrimitiveType type, LongColumnStatsData longStats) {
+    if (!longStats.isSetLowValue()) {
+      lowValue_ = null;
+    } else {
+      Long value = new Long(longStats.getLowValue());
+      lowValue_ = new TColumnValue();
+      switch (type) {
+        case TINYINT:
+          lowValue_.setByte_val(value.byteValue());
+          break;
+        case SMALLINT:
+          lowValue_.setShort_val(value.shortValue());
+          break;
+        case INT:
+          lowValue_.setInt_val(value.intValue());
+          break;
+        case BIGINT:
+          lowValue_.setLong_val(value.longValue());
+          break;
+        default:
+          Preconditions.checkState(
+              false, "Unsupported type encountered in setLowAndHighValue()");
+      }
+    }
+
+    if (!longStats.isSetHighValue()) {
+      highValue_ = null;
+    } else {
+      Long value = new Long(longStats.getHighValue());
+      highValue_ = new TColumnValue();
+      switch (type) {
+        case TINYINT:
+          highValue_.setByte_val(value.byteValue());
+          break;
+        case SMALLINT:
+          highValue_.setShort_val(value.shortValue());
+          break;
+        case INT:
+          highValue_.setInt_val(value.intValue());
+          break;
+        case BIGINT:
+          highValue_.setLong_val(value.longValue());
+          break;
+        default:
+          Preconditions.checkState(
+              false, "Unsupported type encountered in setLowAndHighValue()");
+      }
+    }
+  }
+
+  /*
+   * From the source 'doubleStats', set the low and high value.
+   */
+  protected void setLowAndHighValue(DoubleColumnStatsData doubleStats) {
+    if (!doubleStats.isSetLowValue()) {
+      lowValue_ = null;
+    } else {
+      lowValue_ = new TColumnValue();
+      lowValue_.setDouble_val(doubleStats.getLowValue());
+    }
+
+    if (!doubleStats.isSetHighValue()) {
+      highValue_ = null;
+    } else {
+      highValue_ = new TColumnValue();
+      highValue_.setDouble_val(doubleStats.getHighValue());
+    }
+  }
+
+  /*
+   * From the source 'dateStats', set the low and high value.
+   */
+  protected void setLowAndHighValue(DateColumnStatsData dateStats) {
+    if (!dateStats.isSetLowValue()) {
+      lowValue_ = null;
+    } else {
+      lowValue_ = new TColumnValue();
+      lowValue_.setDate_val((int)dateStats.getLowValue().getDaysSinceEpoch());
+    }
+
+    if (!dateStats.isSetHighValue()) {
+      highValue_ = null;
+    } else {
+      highValue_ = new TColumnValue();
+      highValue_.setDate_val((int)dateStats.getHighValue().getDaysSinceEpoch());
+    }
+  }
+
+  /*
+   * From the source 'decimalStats', set the low and high value.
+   */
+  protected void setLowAndHighValue(DecimalColumnStatsData decimalStats) {
+    if (!decimalStats.isSetLowValue()) {
+      lowValue_ = null;
+    } else {
+      lowValue_ = new TColumnValue();
+      lowValue_.setDecimal_val(decimalStats.getLowValue().getUnscaled());
+    }
+
+    if (!decimalStats.isSetHighValue()) {
+      highValue_ = null;
+    } else {
+      highValue_ = new TColumnValue();
+      highValue_.setDecimal_val(decimalStats.getHighValue().getUnscaled());
+    }
+  }
 
   /**
    * Updates the stats with the given ColumnStatisticsData. If the ColumnStatisticsData
@@ -214,6 +537,16 @@ public class ColumnStats {
     Preconditions.checkState(isSupportedColType(colType));
     initColStats(colType);
     boolean isCompatible = false;
+
+    /// Since the low and high value exist only in the following Hive stats objects:
+    ///   DateColumnStatsData
+    ///   LongColumnStatsData
+    ///   DoubleColumnStatsData
+    ///   DecimalColumnStatsData
+    /// assume no low or high values are available until one with min/max values is
+    /// encountered. At that point of time, setLowAndHighValue() will be called.
+    lowValue_ = null;
+    highValue_ = null;
     switch (colType.getPrimitiveType()) {
       case BOOLEAN:
         isCompatible = statsData.isSetBooleanStats();
@@ -242,6 +575,7 @@ public class ColumnStats {
           LongColumnStatsData longStats = statsData.getLongStats();
           numDistinctValues_ = longStats.getNumDVs();
           numNulls_ = longStats.getNumNulls();
+          setLowAndHighValue(colType.getPrimitiveType(), longStats);
         }
         break;
       case DATE:
@@ -250,6 +584,7 @@ public class ColumnStats {
           DateColumnStatsData dateStats = statsData.getDateStats();
           numDistinctValues_ = dateStats.getNumDVs();
           numNulls_ = dateStats.getNumNulls();
+          setLowAndHighValue(dateStats);
         }
         break;
       case FLOAT:
@@ -259,6 +594,7 @@ public class ColumnStats {
           DoubleColumnStatsData doubleStats = statsData.getDoubleStats();
           numDistinctValues_ = doubleStats.getNumDVs();
           numNulls_ = doubleStats.getNumNulls();
+          setLowAndHighValue(doubleStats);
         }
         break;
       case CHAR:
@@ -302,6 +638,7 @@ public class ColumnStats {
           DecimalColumnStatsData decimalStats = statsData.getDecimalStats();
           numNulls_ = decimalStats.getNumNulls();
           numDistinctValues_ = decimalStats.getNumDVs();
+          setLowAndHighValue(decimalStats);
         }
         break;
       default:
@@ -314,6 +651,74 @@ public class ColumnStats {
   }
 
   /**
+   * Set the low and high value for an Hive LongColumnStatsData object.
+   */
+  public static void updateLowAndHighForHiveColumnStatsData(
+      Long low_value, Long high_value, LongColumnStatsData longColStatsData) {
+    if (low_value != null) {
+      longColStatsData.setLowValue(low_value.longValue());
+    } else {
+      longColStatsData.unsetLowValue();
+    }
+    if (high_value != null) {
+      longColStatsData.setHighValue(high_value.longValue());
+    } else {
+      longColStatsData.unsetHighValue();
+    }
+  }
+
+  /**
+   * Set the low and high value for an Hive DoubleColumnStatsData object.
+   */
+  public static void updateLowAndHighForHiveColumnStatsData(
+      Double low_value, Double high_value, DoubleColumnStatsData doubleColStatsData) {
+    if (low_value != null) {
+      doubleColStatsData.setLowValue(low_value.doubleValue());
+    } else {
+      doubleColStatsData.unsetLowValue();
+    }
+    if (high_value != null) {
+      doubleColStatsData.setHighValue(high_value.doubleValue());
+    } else {
+      doubleColStatsData.unsetHighValue();
+    }
+  }
+
+  /**
+   * Set the low and high value for an Hive DateColumnStatsData object.
+   */
+  public static void updateLowAndHighForHiveColumnStatsData(
+      Date low_value, Date high_value, DateColumnStatsData dateColStatsData) {
+    if (low_value != null) {
+      dateColStatsData.setLowValue(low_value);
+    } else {
+      dateColStatsData.unsetLowValue();
+    }
+    if (high_value != null) {
+      dateColStatsData.setHighValue(high_value);
+    } else {
+      dateColStatsData.unsetHighValue();
+    }
+  }
+
+  /**
+   * Set the low and high value for an Hive DecimalColumnStatsData object.
+   */
+  public static void updateLowAndHighForHiveColumnStatsData(
+      Decimal low_value, Decimal high_value, DecimalColumnStatsData decimalColStatsData) {
+    if (low_value != null) {
+      decimalColStatsData.setLowValue(low_value);
+    } else {
+      decimalColStatsData.unsetLowValue();
+    }
+    if (high_value != null) {
+      decimalColStatsData.setHighValue(high_value);
+    } else {
+      decimalColStatsData.unsetHighValue();
+    }
+  }
+
+  /**
    * Convert the statistics back into an HMS-compatible ColumnStatisticsData object.
    * This is essentially the inverse of {@link #update(Type, ColumnStatisticsData)
    * above.
@@ -330,36 +735,121 @@ public class ColumnStats {
     long numNulls = colStats.getNum_nulls();
     long numTrues = colStats.getNum_trues();
     long numFalses = colStats.getNum_falses();
+    boolean isLowValueSet = colStats.isSetLow_value();
+    boolean isHighValueSet = colStats.isSetHigh_value();
     switch(colType.getPrimitiveType()) {
       case BOOLEAN:
         colStatsData.setBooleanStats(
             new BooleanColumnStatsData(numTrues, numFalses, numNulls));
         break;
       case TINYINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        {
+          ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
+          LongColumnStatsData longColStatsData = new LongColumnStatsData(numNulls, ndv);
+          Long lowValue = null;
+          Long highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetByte_val()) {
+            lowValue = new Long(colStats.low_value.getByte_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetByte_val()) {
+            highValue = new Long(colStats.high_value.getByte_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, longColStatsData);
+          colStatsData.setLongStats(longColStatsData);
+        }
         break;
       case SMALLINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        {
+          ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
+          LongColumnStatsData longColStatsData = new LongColumnStatsData(numNulls, ndv);
+
+          Long lowValue = null;
+          Long highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetShort_val()) {
+            lowValue = new Long(colStats.low_value.getShort_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetShort_val()) {
+            highValue = new Long(colStats.high_value.getShort_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, longColStatsData);
+
+          colStatsData.setLongStats(longColStatsData);
+        }
         break;
       case INT:
-        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        {
+          ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
+          LongColumnStatsData longColStatsData = new LongColumnStatsData(numNulls, ndv);
+
+          Long lowValue = null;
+          Long highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetInt_val()) {
+            lowValue = new Long(colStats.low_value.getInt_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetInt_val()) {
+            highValue = new Long(colStats.high_value.getInt_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, longColStatsData);
+
+          colStatsData.setLongStats(longColStatsData);
+        }
         break;
       case DATE:
-        // Number of distinct dates in the 0001-01-01..9999-12-31 inclusive range is
-        // 3652059.
-        ndv = Math.min(ndv, 3652059);
-        colStatsData.setDateStats(new DateColumnStatsData(numNulls, ndv));
+        {
+          // Number of distinct dates in the 0001-01-01..9999-12-31 inclusive range is
+          // 3652059.
+          ndv = Math.min(ndv, 3652059);
+          DateColumnStatsData dateColStatsData = new DateColumnStatsData(numNulls, ndv);
+          Date lowValue = null;
+          Date highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetDate_val()) {
+            lowValue = new Date(Long.valueOf(colStats.low_value.getDate_val()));
+          }
+          if (isHighValueSet && colStats.high_value.isSetDate_val()) {
+            highValue = new Date(Long.valueOf(colStats.high_value.getDate_val()));
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, dateColStatsData);
+          colStatsData.setDateStats(dateColStatsData);
+        }
         break;
       case BIGINT:
+        {
+          LongColumnStatsData longColStatsData = new LongColumnStatsData(numNulls, ndv);
+
+          Long lowValue = null;
+          Long highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetLong_val()) {
+            lowValue = new Long(colStats.low_value.getLong_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetLong_val()) {
+            highValue = new Long(colStats.high_value.getLong_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, longColStatsData);
+
+          colStatsData.setLongStats(longColStatsData);
+        }
+        break;
       case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
         colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
         break;
       case FLOAT:
       case DOUBLE:
-        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
+        {
+          DoubleColumnStatsData doubleColStatsData =
+              new DoubleColumnStatsData(numNulls, ndv);
+
+          Double lowValue = null;
+          Double highValue = null;
+          if (isLowValueSet && colStats.low_value.isSetDouble_val()) {
+            lowValue = new Double(colStats.low_value.getDouble_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetDouble_val()) {
+            highValue = new Double(colStats.high_value.getDouble_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, doubleColStatsData);
+
+          colStatsData.setDoubleStats(doubleColStatsData);
+        }
         break;
       case CHAR:
       case VARCHAR:
@@ -370,9 +860,25 @@ public class ColumnStats {
             new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
         break;
       case DECIMAL:
-        double decMaxNdv = Math.pow(10, colType.getPrecision());
-        ndv = (long) Math.min(ndv, decMaxNdv);
-        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, ndv));
+        {
+          double decMaxNdv = Math.pow(10, colType.getPrecision());
+          ndv = (long) Math.min(ndv, decMaxNdv);
+          DecimalColumnStatsData decimalStatsData =
+              new DecimalColumnStatsData(numNulls, ndv);
+          Decimal lowValue = null;
+          Decimal highValue = null;
+          ScalarType colTypeScalar = (ScalarType) colType;
+          if (isLowValueSet && colStats.low_value.isSetDecimal_val()) {
+            lowValue = new Decimal((short) colTypeScalar.decimalScale(),
+                colStats.low_value.bufferForDecimal_val());
+          }
+          if (isHighValueSet && colStats.high_value.isSetDecimal_val()) {
+            highValue = new Decimal((short) colTypeScalar.decimalScale(),
+                colStats.high_value.bufferForDecimal_val());
+          }
+          updateLowAndHighForHiveColumnStatsData(lowValue, highValue, decimalStatsData);
+          colStatsData.setDecimalStats(decimalStatsData);
+        }
         break;
       default:
         return null;
@@ -463,6 +969,8 @@ public class ColumnStats {
     maxSize_ = stats.getMax_size();
     numDistinctValues_ = stats.getNum_distinct_values();
     numNulls_ = stats.getNum_nulls();
+    lowValue_ = stats.getLow_value();
+    highValue_ = stats.getHigh_value();
     validate(colType);
   }
 
@@ -474,6 +982,8 @@ public class ColumnStats {
     colStats.setNum_nulls(numNulls_);
     colStats.setNum_trues(numTrues_);
     colStats.setNum_falses(numFalses_);
+    colStats.setLow_value(lowValue_);
+    colStats.setHigh_value(highValue_);
     return colStats;
   }
 
@@ -510,6 +1020,8 @@ public class ColumnStats {
         .add("numNulls_", numNulls_)
         .add("numTrues", numTrues_)
         .add("numFalses", numFalses_)
+        .add("lowValue", getLowValueAsString())
+        .add("highValue", getHighValueAsString())
         .toString();
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index db6c635..e228f36 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -272,4 +272,11 @@ public enum HdfsFileFormat {
     }
     return result;
   }
+
+  /**
+   * Returns true if the format is Parquet, false otherwise.
+   */
+  public boolean isParquetBased() {
+    return this == HdfsFileFormat.PARQUET || this == HdfsFileFormat.HUDI_PARQUET;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 59b6572..f0ecdee 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -950,6 +950,10 @@ public class HdfsTable extends Table implements FeFsTable {
         nullPartitionIds_.get(i).add(Long.valueOf(partition.getId()));
         continue;
       }
+
+      // Update the low and high value with 'literal'.
+      stats.updateLowAndHighValue(literal);
+
       Set<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
       if (partitionIds == null) {
         partitionIds = new HashSet<>();
@@ -2749,4 +2753,13 @@ public class HdfsTable extends Table implements FeFsTable {
   public void setLastVersionSeenByTopicUpdate(long version) {
     lastVersionSeenByTopicUpdate_ = version;
   }
+
+  public boolean isParquetTable() {
+    for (FeFsPartition partition: partitionMap_.values()) {
+      if (!partition.getFileFormat().isParquetBased()) {
+        return false;
+      }
+    }
+    return true;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index f22b835..6abc198 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -382,14 +382,6 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Returns true if this HdfsFileFormat is PARQUET or HUDI_PARQUET
-   */
-  private boolean isParquetBased(HdfsFileFormat fileFormat) {
-    return fileFormat == HdfsFileFormat.PARQUET
-        || fileFormat == HdfsFileFormat.HUDI_PARQUET;
-  }
-
-  /**
    * Returns true if the Parquet count(*) optimization can be applied to the query block
    * of this scan node.
    */
@@ -1038,7 +1030,7 @@ public class HdfsScanNode extends ScanNode {
 
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
       fileFormats_.add(partition.getFileFormat());
-      if (!isParquetBased(partition.getFileFormat())) {
+      if (!partition.getFileFormat().isParquetBased()) {
         allParquet = false;
       }
       Preconditions.checkState(partition.getId() >= 0);
@@ -1046,7 +1038,7 @@ public class HdfsScanNode extends ScanNode {
       if (!fsHasBlocks) {
         // Limit the scan range length if generating scan ranges (and we're not
         // short-circuiting the scan for a partition key scan).
-        long defaultBlockSize = isParquetBased(partition.getFileFormat()) ?
+        long defaultBlockSize = (partition.getFileFormat().isParquetBased()) ?
             analyzer.getQueryOptions().parquet_object_store_split_size :
             partitionFs.getDefaultBlockSize(partition.getLocationPath());
         long maxBlockSize =
@@ -1886,7 +1878,7 @@ public class HdfsScanNode extends ScanNode {
     int perHostScanRanges = 0;
     for (HdfsFileFormat format : fileFormats_) {
       int partitionScanRange = 0;
-      if (isParquetBased(format) || format == HdfsFileFormat.ORC) {
+      if (format.isParquetBased() || format == HdfsFileFormat.ORC) {
         Preconditions.checkNotNull(columnReservations);
         // For the purpose of this estimation, the number of per-host scan ranges for
         // Parquet/HUDI_PARQUET/ORC files are equal to the number of columns read from the
@@ -1962,7 +1954,7 @@ public class HdfsScanNode extends ScanNode {
       // TODO: IMPALA-6875 - ORC should compute total reservation across columns once the
       // ORC scanner supports reservations. For now it is treated the same as a
       // row-oriented format because there is no per-column reservation.
-      if (isParquetBased(format)) {
+      if (format.isParquetBased()) {
         // With Parquet, we first read the footer then all of the materialized columns in
         // parallel.
         for (long columnReservation : columnReservations) {
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 08f7286..085a025 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -45,6 +45,7 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
@@ -53,6 +54,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
@@ -224,14 +226,20 @@ public final class RuntimeFilterGenerator {
       public final boolean isBoundByPartitionColumns;
       // Indicates if 'node' is in the same fragment as the join that produces the filter
       public final boolean isLocalTarget;
+      // The low and high value of the column on which the filter is applied
+      public final TColumnValue lowValue;
+      public final TColumnValue highValue;
 
       public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
-          boolean isBoundByPartitionColumns, boolean isLocalTarget) {
+          boolean isBoundByPartitionColumns, boolean isLocalTarget, TColumnValue lowValue,
+          TColumnValue highValue) {
         Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
         node = targetNode;
         expr = targetExpr;
         this.isBoundByPartitionColumns = isBoundByPartitionColumns;
         this.isLocalTarget = isLocalTarget;
+        this.lowValue = lowValue;
+        this.highValue = highValue;
       }
 
       public TRuntimeFilterTargetDesc toThrift() {
@@ -253,6 +261,8 @@ public final class RuntimeFilterGenerator {
           tFilterTarget.setKudu_col_name(col.getKuduName());
           tFilterTarget.setKudu_col_type(col.getType().toThrift());
         }
+        tFilterTarget.setLow_value(lowValue);
+        tFilterTarget.setHigh_value(highValue);
         return tFilterTarget;
       }
 
@@ -263,6 +273,8 @@ public final class RuntimeFilterGenerator {
             .append("Target expr: " + expr.debugString() + " ")
             .append("Partition columns: " + isBoundByPartitionColumns)
             .append("Is local: " + isLocalTarget)
+            .append("lowValue: " + lowValue.toString())
+            .append("highValue: " + highValue.toString())
             .toString();
       }
     }
@@ -904,8 +916,21 @@ public final class RuntimeFilterGenerator {
           }
         }
       }
-      RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
-          scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget);
+      TColumnValue lowValue = null;
+      TColumnValue highValue = null;
+      if (scanNode instanceof HdfsScanNode) {
+        SlotRef slotRefInScan = targetExpr.unwrapSlotRef(true);
+        if (slotRefInScan != null) {
+          Column col = slotRefInScan.getDesc().getColumn();
+          if (col != null) {
+            lowValue = col.getStats().getLowValue();
+            highValue = col.getStats().getHighValue();
+          }
+        }
+      }
+      RuntimeFilter.RuntimeFilterTarget target =
+          new RuntimeFilter.RuntimeFilterTarget(scanNode, targetExpr,
+              isBoundByPartitionColumns, isLocalTarget, lowValue, highValue);
       filter.addTarget(target);
     }
 
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 9c483ff..bfc9df3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1336,10 +1336,12 @@ public class CatalogOpExecutor {
               ndvCap, entry.getValue(), tableCol.getType());
       if (colStatsData == null) continue;
       if (LOG.isTraceEnabled()) {
-        LOG.trace(String.format("Updating column stats for %s: numDVs=%d numNulls=%d " +
-            "maxSize=%d avgSize=%.2f", colName, entry.getValue().getNum_distinct_values(),
+        LOG.trace(String.format("Updating column stats for %s: numDVs=%d numNulls=%d "
+                + "maxSize=%d avgSize=%.2f minValue=%s maxValue=%s",
+            colName, entry.getValue().getNum_distinct_values(),
             entry.getValue().getNum_nulls(), entry.getValue().getMax_size(),
-            entry.getValue().getAvg_size()));
+            entry.getValue().getAvg_size(), entry.getValue().getLow_value().toString(),
+            entry.getValue().getHigh_value().toString()));
       }
       ColumnStatisticsObj colStatsObj = new ColumnStatisticsObj(colName,
           tableCol.getType().toString().toLowerCase(), colStatsData);
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d871e4b..959cae8 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1200,20 +1200,20 @@ public class Frontend {
   /**
    * Generate result set and schema for a SHOW COLUMN STATS command.
    */
-  public TResultSet getColumnStats(String dbName, String tableName)
+  public TResultSet getColumnStats(String dbName, String tableName, boolean showMinMax)
       throws ImpalaException {
     RetryTracker retries = new RetryTracker(
         String.format("fetching column stats from %s.%s", dbName, tableName));
     while (true) {
       try {
-        return doGetColumnStats(dbName, tableName);
+        return doGetColumnStats(dbName, tableName, showMinMax);
       } catch(InconsistentMetadataFetchException e) {
         retries.handleRetryOrThrow(e);
       }
     }
   }
 
-  private TResultSet doGetColumnStats(String dbName, String tableName)
+  private TResultSet doGetColumnStats(String dbName, String tableName, boolean showMinMax)
       throws ImpalaException {
     FeTable table = getCatalog().getTable(dbName, tableName);
     TResultSet result = new TResultSet();
@@ -1228,18 +1228,36 @@ public class Frontend {
     resultSchema.addToColumns(new TColumn("Avg Size", Type.DOUBLE.toThrift()));
     resultSchema.addToColumns(new TColumn("#Trues", Type.BIGINT.toThrift()));
     resultSchema.addToColumns(new TColumn("#Falses", Type.BIGINT.toThrift()));
+    if (showMinMax) {
+      resultSchema.addToColumns(new TColumn("Min", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Max", Type.STRING.toThrift()));
+    }
 
     for (Column c: table.getColumnsInHiveOrder()) {
       TResultRowBuilder rowBuilder = new TResultRowBuilder();
-      // Add name, type, NDVs, numNulls, max size and avg size.
-      rowBuilder.add(c.getName())
-          .add(c.getType().toSql())
-          .add(c.getStats().getNumDistinctValues())
-          .add(c.getStats().getNumNulls())
-          .add(c.getStats().getMaxSize())
-          .add(c.getStats().getAvgSize())
-          .add(c.getStats().getNumTrues())
-          .add(c.getStats().getNumFalses());
+      // Add name, type, NDVs, numNulls, max size, avg size, and conditionally
+      // the min value and max value.
+      if (showMinMax) {
+        rowBuilder.add(c.getName())
+            .add(c.getType().toSql())
+            .add(c.getStats().getNumDistinctValues())
+            .add(c.getStats().getNumNulls())
+            .add(c.getStats().getMaxSize())
+            .add(c.getStats().getAvgSize())
+            .add(c.getStats().getNumTrues())
+            .add(c.getStats().getNumFalses())
+            .add(c.getStats().getLowValueAsString())
+            .add(c.getStats().getHighValueAsString());
+      } else {
+        rowBuilder.add(c.getName())
+            .add(c.getType().toSql())
+            .add(c.getStats().getNumDistinctValues())
+            .add(c.getStats().getNumNulls())
+            .add(c.getStats().getMaxSize())
+            .add(c.getStats().getAvgSize())
+            .add(c.getStats().getNumTrues())
+            .add(c.getStats().getNumFalses());
+      }
       result.addToRows(rowBuilder.get());
     }
     return result;
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 5c3cb81..0f01449 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -385,7 +385,8 @@ public class JniFrontend {
 
     if (params.op == TShowStatsOp.COLUMN_STATS) {
       result = frontend_.getColumnStats(params.getTable_name().getDb_name(),
-          params.getTable_name().getTable_name());
+          params.getTable_name().getTable_name(),
+          params.isSetShow_column_minmax_stats() && params.show_column_minmax_stats);
     } else {
       result = frontend_.getTableStats(params.getTable_name().getDb_name(),
           params.getTable_name().getTable_name(), params.op);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index db5655b..6a550ea 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.thrift.TException;
@@ -359,4 +360,12 @@ public class MetaStoreUtil {
       return writeId_;
     }
   }
+
+  /**
+   * Returns true if the min/max stats of 'type' can be stored in HMS (Hive metastore).
+   */
+  public static boolean canStoreMinmaxInHMS(Type type) {
+    return (type.isIntegerType() || type.isFloatingPointType() || type.isDecimal()
+        || type.isDate());
+  }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-column-minmax.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-column-minmax.test
new file mode 100644
index 0000000..db0f06c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-column-minmax.test
@@ -0,0 +1,95 @@
+====
+---- QUERY
+##################################
+# Create a new alltypestiny table.
+##################################
+drop table if exists alltypestiny;
+CREATE TABLE alltypestiny
+STORED AS PARQUET
+as select * from functional_parquet.alltypestiny;
+====
+---- QUERY
+# Compute stats including the min/max for integers and floats.
+set compute_column_minmax_stats = true;
+compute stats alltypestiny;
+====
+---- QUERY
+# show column stats including the min/max.
+set show_column_minmax_stats = true;
+show column stats alltypestiny;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES, MIN, MAX
+---- RESULTS
+'id','INT',8,0,4,4.0,-1,-1,'0','7'
+'bool_col','BOOLEAN',2,0,1,1.0,4,4,'-1','-1'
+'tinyint_col','TINYINT',2,0,1,1.0,-1,-1,'0','1'
+'smallint_col','SMALLINT',2,0,2,2.0,-1,-1,'0','1'
+'int_col','INT',2,0,4,4.0,-1,-1,'0','1'
+'bigint_col','BIGINT',2,0,8,8.0,-1,-1,'0','10'
+'float_col','FLOAT',2,0,4,4.0,-1,-1,'0.0','1.100000023841858'
+'double_col','DOUBLE',2,0,8,8.0,-1,-1,'0.0','10.1'
+'date_string_col','STRING',4,0,8,8.0,-1,-1,'-1','-1'
+'string_col','STRING',2,0,1,1.0,-1,-1,'-1','-1'
+'timestamp_col','TIMESTAMP',8,0,16,16.0,-1,-1,'-1','-1'
+'year','INT',1,0,4,4.0,-1,-1,'2009','2009'
+'month','INT',4,0,4,4.0,-1,-1,'1','4'
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT, STRING, STRING
+====
+---- QUERY
+##############################
+# Create a new date_tbl table.
+##############################
+drop table if exists date_tbl;
+CREATE TABLE date_tbl
+STORED AS PARQUET
+as select * from functional_parquet.date_tbl;
+====
+---- QUERY
+# Compute stats including the min/max for date types.
+set compute_column_minmax_stats = true;
+compute stats date_tbl;
+====
+---- QUERY
+# show column stats including the min/max.
+set show_column_minmax_stats = true;
+show column stats date_tbl;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES, MIN, MAX
+---- RESULTS
+'id_col','INT',22,0,4,4,-1,-1,'0','31'
+'date_col','DATE',16,2,4,4,-1,-1,'0001-01-01','9999-12-31'
+'date_part','DATE',4,0,4,4,-1,-1,'0001-01-01','9999-12-31'
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT, STRING, STRING
+====
+---- QUERY
+#################################
+# Create a new decimal_tbl table.
+#################################
+drop table if exists decimal_tbl;
+CREATE TABLE decimal_tbl
+STORED AS PARQUET
+as select * from functional_parquet.decimal_tbl;
+====
+---- QUERY
+# Compute stats including the min/max for decimal types.
+set compute_column_minmax_stats = true;
+compute stats decimal_tbl;
+====
+---- QUERY
+# show column stats including the min/max.
+set show_column_minmax_stats = true;
+show column stats decimal_tbl;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE, #TRUES, #FALSES, MIN, MAX
+---- RESULTS
+'d1','DECIMAL(9,0)',4,0,4,4,-1,-1,'1234','132842'
+'d2','DECIMAL(10,0)',3,0,8,8,-1,-1,'111','2222'
+'d3','DECIMAL(20,10)',5,0,16,16,-1,-1,'1.2345678900','12345.6789000000'
+'d4','DECIMAL(38,38)',1,0,16,16,-1,-1,'0.12345678900000000000000000000000000000','0.12345678900000000000000000000000000000'
+'d5','DECIMAL(10,5)',5,0,8,8,-1,-1,'0.10000','12345.78900'
+'d6','DECIMAL(9,0)',1,0,4,4,-1,-1,'1','1'
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT, STRING, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
index 49641ea..55f6272 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
@@ -5,11 +5,11 @@
 # with controlled number of rows per page.
 ###################################################
 set PARQUET_PAGE_ROW_COUNT_LIMIT=24000;
-drop table if exists tpch_parquet.lineitem_orderkey_only;
-CREATE TABLE tpch_parquet.lineitem_orderkey_only(l_orderkey bigint)
+drop table if exists lineitem_orderkey_only;
+CREATE TABLE lineitem_orderkey_only(l_orderkey bigint)
 sort by (l_orderkey)
 STORED AS PARQUET;
-insert into tpch_parquet.lineitem_orderkey_only
+insert into lineitem_orderkey_only
 select l_orderkey from tpch_parquet.lineitem;
 ====
 ---- QUERY
@@ -20,7 +20,7 @@ SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET MINMAX_FILTERING_LEVEL=PAGE;
 SET MINMAX_FILTER_THRESHOLD=0.5;
 select straight_join a.l_orderkey from
-tpch_parquet.lineitem_orderkey_only a join [SHUFFLE] tpch_parquet.orders b
+lineitem_orderkey_only a join [SHUFFLE] tpch_parquet.orders b
 where a.l_orderkey = b.o_orderkey
 and b.o_custkey = 5 order by l_orderkey;
 ---- RESULTS
@@ -48,12 +48,6 @@ aggregation(SUM, NumRuntimeFilteredPages)> 200
 ====
 ---- QUERY
 ###################################################
-# Drop the table.
-###################################################
-drop table if exists tpch_parquet.lineitem_orderkey_only;
-====
----- QUERY
-###################################################
 # ss_sold_time_sk is INT.
 ###################################################
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
@@ -119,8 +113,8 @@ aggregation(SUM, NumRuntimeFilteredPages): 0
 # Create the lineitem table with sorted l_shipdate
 # which is a STRING.
 ###################################################
-drop table if exists tpch_parquet.lineitem_sorted_l_shipdate;
-CREATE TABLE tpch_parquet.lineitem_sorted_l_shipdate
+drop table if exists lineitem_sorted_l_shipdate;
+CREATE TABLE lineitem_sorted_l_shipdate
 sort by (l_shipdate)
 STORED AS PARQUET
 as select * from tpch_parquet.lineitem;
@@ -135,7 +129,7 @@ SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET MINMAX_FILTER_THRESHOLD=0.5;
 SET MINMAX_FILTERING_LEVEL=PAGE;
 select straight_join count(*)
-from tpch_parquet.lineitem_sorted_l_shipdate a join [SHUFFLE]
+from lineitem_sorted_l_shipdate a join [SHUFFLE]
 tpch_parquet.orders b
 where a.l_shipdate = b.o_orderdate and b.o_orderkey = 2
 ---- RESULTS
@@ -152,7 +146,7 @@ aggregation(SUM, NumRuntimeFilteredPages): 120
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET MINMAX_FILTER_THRESHOLD=0.0;
 select straight_join count(*)
-from tpch_parquet.lineitem_sorted_l_shipdate a join [SHUFFLE]
+from lineitem_sorted_l_shipdate a join [SHUFFLE]
 tpch_parquet.orders b
 where a.l_shipdate = b.o_orderdate and b.o_orderkey = 2
 ---- RESULTS
@@ -161,18 +155,12 @@ where a.l_shipdate = b.o_orderdate and b.o_orderkey = 2
 aggregation(SUM, NumRuntimeFilteredPages): 0
 ====
 ---- QUERY
-###################################################
-# Drop the table.
-###################################################
-drop table if exists tpch_parquet.lineitem_sorted_l_shipdate;
-====
----- QUERY
 ##################################################
 # Create the lineitem table with sorted
 # l_extendedprice which is DECIMAL(12,2).
 ###################################################
-drop TABLE if exists tpch_parquet.lineitem_sorted_l_extendedprice;
-CREATE TABLE tpch_parquet.lineitem_sorted_l_extendedprice
+drop TABLE if exists lineitem_sorted_l_extendedprice;
+CREATE TABLE lineitem_sorted_l_extendedprice
 sort by (l_extendedprice)
 STORED AS PARQUET
 as select * from tpch_parquet.lineitem;
@@ -186,8 +174,8 @@ SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET MINMAX_FILTER_THRESHOLD=1.0;
 SET MINMAX_FILTERING_LEVEL=PAGE;
 select straight_join count(*)
-from tpch_parquet.lineitem_sorted_l_extendedprice a join [SHUFFLE]
-tpch_parquet.lineitem_sorted_l_extendedprice b
+from lineitem_sorted_l_extendedprice a join [SHUFFLE]
+lineitem_sorted_l_extendedprice b
 where a.l_extendedprice = b.l_extendedprice and b.l_orderkey = 1;
 ---- RESULTS
 36
@@ -203,8 +191,8 @@ aggregation(SUM, NumRuntimeFilteredPages): 255
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET MINMAX_FILTER_THRESHOLD=0.0;
 select straight_join count(*)
-from tpch_parquet.lineitem_sorted_l_extendedprice a join [SHUFFLE]
-tpch_parquet.lineitem_sorted_l_extendedprice b
+from lineitem_sorted_l_extendedprice a join [SHUFFLE]
+lineitem_sorted_l_extendedprice b
 where a.l_extendedprice = b.l_extendedprice and b.l_orderkey = 1;
 ---- RESULTS
 36
@@ -213,7 +201,66 @@ aggregation(SUM, NumRuntimeFilteredPages): 0
 ====
 ---- QUERY
 ###################################################
-# Drop the table.
+# Create store_sales table in unique_database.
 ###################################################
-drop table if exists tpch_parquet.lineitem_sorted_l_extendedprice;
+drop table if exists store_sales;
+CREATE TABLE store_sales
+partitioned by (ss_sold_date_sk)
+STORED AS PARQUET
+as select * from tpcds_parquet.store_sales;
+set compute_column_minmax_stats = true;
+compute stats store_sales(ss_addr_sk);
+====
+---- QUERY
+###################################################
+# Check out the number row groups with filters
+# rejected by column stats with a partitioned join.
+###################################################
+set minmax_filter_threshold=0.5;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select count(a.ss_sold_time_sk) from
+store_sales a join [SHUFFLE] store_sales b
+on a.ss_addr_sk = b.ss_addr_sk where
+b.ss_customer_sk < 10;
+---- RESULTS
+12728
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroupsSkippedByUnusefulFilters)> 1700
+====
+---- QUERY
+###################################################
+# Check out the number row groups with filters
+# rejected by column stats with a broadcast join.
+###################################################
+set minmax_filter_threshold=0.5;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+select count(a.ss_sold_time_sk) from
+store_sales a join store_sales b
+on a.ss_addr_sk = b.ss_addr_sk where
+b.ss_customer_sk < 10;
+---- RESULTS
+12728
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroupsSkippedByUnusefulFilters)> 1700
+====
+---- QUERY
+###################################################
+# clear the stats on store_sales.
+###################################################
+drop stats store_sales;
+====
+---- QUERY
+###################################################
+# The number of row groups with filters rejected by
+# column stats should drop down to 0.
+###################################################
+set minmax_filter_threshold=0.5;
+select count(a.ss_sold_time_sk) from
+store_sales a join [SHUFFLE] store_sales b
+on a.ss_addr_sk = b.ss_addr_sk where
+b.ss_customer_sk < 10;
+---- RESULTS
+12728
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroupsSkippedByUnusefulFilters): 0
 ====
diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py
index 5057768..bbb0a40 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -428,3 +428,20 @@ class TestIncompatibleColStats(ImpalaTestSuite):
     self.client.execute("compute stats %s" % table_name)
     result = self.client.execute("select s from %s" % table_name)
     assert len(result.data) == 10
+
+
+# Test column min/max stats currently enabled for Parquet tables.
+class TestParquetComputeColumnMinMax(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetComputeColumnMinMax, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_compute_stats(self, vector, unique_database):
+    self.run_test_case('QueryTest/compute-stats-column-minmax', vector, unique_database)
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index a60fa3b..2f9b6b6 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -282,9 +282,9 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
     if build_runs_slowly:
       add_exec_option_dimension(cls, "async_codegen", 1)
 
-  def test_overlap_min_max_filters(self, vector):
+  def test_overlap_min_max_filters(self, vector, unique_database):
     self.execute_query("SET MINMAX_FILTER_THRESHOLD=0.5")
-    self.run_test_case('QueryTest/overlap_min_max_filters', vector,
+    self.run_test_case('QueryTest/overlap_min_max_filters', vector, unique_database,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
 # Apply both Bloom filter and Minmax filters