You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/07/11 12:58:10 UTC

[impala] branch master updated (59d3285 -> 84d7843)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 59d3285  IMPALA-9822: Add warnings when row format delimiters are ignored on non-text/sequence tables
     new 025500c  IMPALA-10489: Implement JWT support
     new 84d7843  IMPALA-10738: Min/max filters should be enabled for partition columns

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CMakeLists.txt                                     |   6 +
 be/CMakeLists.txt                                  |   1 +
 be/src/exec/hdfs-scan-node-base.cc                 |   3 +-
 be/src/exec/hdfs-scanner.h                         |   3 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  12 +
 be/src/exec/parquet/hdfs-parquet-scanner.h         |   4 +
 be/src/rpc/auth-provider.h                         |   6 +-
 be/src/rpc/authentication.cc                       | 103 ++-
 be/src/runtime/runtime-filter.h                    |   5 +
 be/src/service/impala-server.cc                    |  17 +
 be/src/service/query-options.cc                    |   7 +-
 be/src/service/query-options.h                     |   6 +-
 be/src/transport/THttpServer.cpp                   |  35 +-
 be/src/transport/THttpServer.h                     |  30 +-
 be/src/util/CMakeLists.txt                         |   3 +
 be/src/util/jwt-util-internal.h                    | 224 +++++++
 be/src/util/jwt-util-test.cc                       | 702 +++++++++++++++++++++
 be/src/util/jwt-util.cc                            | 595 +++++++++++++++++
 be/src/util/jwt-util.h                             |  91 +++
 be/src/util/webserver.cc                           |  78 ++-
 be/src/util/webserver.h                            |  14 +
 bin/bootstrap_toolchain.py                         |   2 +-
 bin/impala-config.sh                               |   2 +
 bin/rat_exclude_files.txt                          |   1 +
 .../{FindRapidJson.cmake => FindJwtCpp.cmake}      |  22 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   4 +
 common/thrift/generate_error_codes.py              |   4 +
 common/thrift/metrics.json                         |  46 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  63 +-
 .../java/org/apache/impala/planner/Planner.java    |   5 +
 .../impala/planner/RuntimeFilterGenerator.java     |  34 +-
 .../apache/impala/customcluster/JwtHttpTest.java   | 217 +++++++
 .../impala/customcluster/JwtWebserverTest.java     | 141 +++++
 .../apache/impala/customcluster/LdapHS2Test.java   |  75 +++
 .../impala/customcluster/LdapWebserverTest.java    |  52 ++
 .../org/apache/impala/planner/PlannerTest.java     |   4 +
 .../apache/impala/planner/TpcdsPlannerTest.java    |   2 +
 testdata/jwt/jwks_rs256.json                       |   7 +
 .../PlannerTest/bloom-filter-assignment.test       |  16 +-
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |  53 +-
 .../PlannerTest/min-max-runtime-filters.test       |  29 +-
 .../PlannerTest/runtime-filter-query-options.test  |   4 +-
 .../queries/QueryTest/overlap_min_max_filters.test |  14 +-
 ...erlap_min_max_filters_on_partition_columns.test |  61 ++
 .../queries/QueryTest/runtime_filters.test         |   1 +
 .../queries/QueryTest/runtime_filters_mt_dop.test  |   1 +
 tests/query_test/test_runtime_filters.py           |  12 +-
 48 files changed, 2688 insertions(+), 132 deletions(-)
 create mode 100644 be/src/util/jwt-util-internal.h
 create mode 100644 be/src/util/jwt-util-test.cc
 create mode 100644 be/src/util/jwt-util.cc
 create mode 100644 be/src/util/jwt-util.h
 copy cmake_modules/{FindRapidJson.cmake => FindJwtCpp.cmake} (67%)
 create mode 100644 fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java
 create mode 100644 fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java
 create mode 100644 testdata/jwt/jwks_rs256.json
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_partition_columns.test

[impala] 01/02: IMPALA-10489: Implement JWT support

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 025500ccb5fed088e58da7bb7a8021088a9bba98
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu May 6 22:34:25 2021 -0700

    IMPALA-10489: Implement JWT support
    
    This patch added JWT support with following functionality:
     * Load and parse JWKS from pre-installed JSON file.
     * Read the JWT token from the HTTP Header.
     * Verify the JWT's signature with public key in JWKS.
     * Get the username out of the payload of JWT token.
     * Support following JSON Web Algorithms (JWA):
       HS256, HS384, HS512, RS256, RS384, RS512.
    
    We use third party library jwt-cpp to verify JWT token. jwt-cpp is a
    headers only C++ library. It was added to native-toolchain.
    This patch modified bootstrap_toolchain.py to download jwt-cpp from
    toolchain s3 bucket, and modified makefiles to add jwt-cpp/include
    in the include path.
    
    Added BE unit-tests for loading JWKS file and verifying JWT token.
    Also added FE custom cluster test for JWT authentication.
    
    Testing:
     - Passed core run.
    
    Change-Id: I6b71fa854c9ddc8ca882878853395e1eb866143c
    Reviewed-on: http://gerrit.cloudera.org:8080/17435
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                                     |   6 +
 be/CMakeLists.txt                                  |   1 +
 be/src/rpc/auth-provider.h                         |   6 +-
 be/src/rpc/authentication.cc                       | 103 ++-
 be/src/service/impala-server.cc                    |  17 +
 be/src/transport/THttpServer.cpp                   |  35 +-
 be/src/transport/THttpServer.h                     |  30 +-
 be/src/util/CMakeLists.txt                         |   3 +
 be/src/util/jwt-util-internal.h                    | 224 +++++++
 be/src/util/jwt-util-test.cc                       | 702 +++++++++++++++++++++
 be/src/util/jwt-util.cc                            | 595 +++++++++++++++++
 be/src/util/jwt-util.h                             |  91 +++
 be/src/util/webserver.cc                           |  78 ++-
 be/src/util/webserver.h                            |  14 +
 bin/bootstrap_toolchain.py                         |   2 +-
 bin/impala-config.sh                               |   2 +
 bin/rat_exclude_files.txt                          |   1 +
 cmake_modules/FindJwtCpp.cmake                     |  38 ++
 common/thrift/generate_error_codes.py              |   4 +
 common/thrift/metrics.json                         |  46 +-
 .../apache/impala/customcluster/JwtHttpTest.java   | 217 +++++++
 .../impala/customcluster/JwtWebserverTest.java     | 141 +++++
 .../apache/impala/customcluster/LdapHS2Test.java   |  75 +++
 .../impala/customcluster/LdapWebserverTest.java    |  52 ++
 testdata/jwt/jwks_rs256.json                       |   7 +
 25 files changed, 2464 insertions(+), 26 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 18ce4e7..de769c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -102,6 +102,7 @@ set_dep_root(GFLAGS)
 set_dep_root(GLOG)
 set_dep_root(GPERFTOOLS)
 set_dep_root(GTEST)
+set_dep_root(JWT_CPP)
 set_dep_root(LIBEV)
 set_dep_root(LIBUNWIND)
 set_dep_root(LLVM)
@@ -289,6 +290,11 @@ IMPALA_ADD_THIRDPARTY_LIB(zstd ${ZSTD_INCLUDE_DIR} ${ZSTD_STATIC_LIB} "")
 find_package(Re2 REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(re2 ${RE2_INCLUDE_DIR} ${RE2_STATIC_LIB} "")
 
+# find jwt-cpp headers
+find_package(JwtCpp REQUIRED)
+include_directories(${JWT_CPP_INCLUDE_DIR})
+message(STATUS "jwt-cpp include dir: " ${JWT_CPP_INCLUDE_DIR})
+
 # find rapidjson headers
 find_package(RapidJson REQUIRED)
 include_directories(${RAPIDJSON_INCLUDE_DIR})
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 638f0a7..fd1a40c 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -338,6 +338,7 @@ set(CLANG_INCLUDE_FLAGS
   "-I${GLOG_INCLUDE_DIR}"
   "-I${GFLAGS_INCLUDE_DIR}"
   "-I${GTEST_INCLUDE_DIR}"
+  "-I${JWT_CPP_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
   "-I${ORC_INCLUDE_DIR}"
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 84a6f23..30986ac 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -83,7 +83,7 @@ class AuthProvider {
 class SecureAuthProvider : public AuthProvider {
  public:
   SecureAuthProvider(bool is_internal)
-    : has_ldap_(false), has_saml_(false), is_internal_(is_internal) {}
+    : has_ldap_(false), has_saml_(false), has_jwt_(false), is_internal_(is_internal) {}
 
   /// Performs initialization of external state.
   /// If we're using ldap, set up appropriate certificate usage.
@@ -131,6 +131,8 @@ class SecureAuthProvider : public AuthProvider {
 
   void InitSaml() { has_saml_ = true; }
 
+  void InitJwt() { has_jwt_ = true; }
+
   /// Used for testing
   const std::string& principal() const { return principal_; }
   const std::string& service_name() const { return service_name_; }
@@ -144,6 +146,8 @@ class SecureAuthProvider : public AuthProvider {
 
   bool has_saml_;
 
+  bool has_jwt_;
+
   /// Hostname of this machine - if kerberos, derived from principal.  If there
   /// is no kerberos, but LDAP is used, then acquired via GetHostname().
   std::string hostname_;
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index a482145..b2071a1 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -57,6 +57,7 @@
 #include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
+#include "util/jwt-util.h"
 #include "util/ldap-util.h"
 #include "util/network-util.h"
 #include "util/os-util.h"
@@ -137,6 +138,31 @@ DEFINE_bool_hidden(saml2_allow_without_tls_debug_only, false,
 
 DECLARE_string(saml2_sp_callback_url);
 
+// If set, Impala support for trusting an authentication based on JWT token in the HTTP
+// header.
+DEFINE_bool(jwt_token_auth, false,
+    "When true, read the JWT token out of the HTTP Header and extract user name from "
+    "the token payload.");
+// The last segment of a JWT is the signature, which is used to verify that the token was
+// signed by the sender and not altered in any way. By default, it's required to validate
+// the signature of the JWT tokens. Otherwise it may expose security issue.
+DEFINE_bool(jwt_validate_signature, true,
+    "When true, validate the signature of JWT token with pre-installed JWKS.");
+// JWKS consists the public keys used by the signing party to the clients that need to
+// validate signatures. It represents cryptographic keys in JSON data structure.
+DEFINE_string(jwks_file_path, "",
+    "File path of the pre-installed JSON Web Key Set (JWKS) for JWT verification");
+// This specifies the custom claim in the JWT that contains the "username" for the
+// session.
+DEFINE_string(jwt_custom_claim_username, "username", "Custom claim 'username'");
+// If set, Impala allows JWT authentication on unsecure channel.
+// JWT is only secure when used with TLS. But in some deployment scenarios, TLS is handled
+// by proxy so that it does not show up as TLS to Impala.
+DEFINE_bool_hidden(jwt_allow_without_tls, false,
+    "When this configuration is set to true, Impala allows JWT authentication on "
+    "unsecure channel. This should be only enabled for testing, or development for which "
+    "TLS is handled by proxy.");
+
 namespace impala {
 
 // Sasl callbacks.  Why are these here?  Well, Sasl isn't that bright, and
@@ -525,6 +551,41 @@ bool BasicAuth(ThriftServer::ConnectionContext* connection_context,
   return false;
 }
 
+bool JWTTokenAuth(ThriftServer::ConnectionContext* connection_context,
+    const AuthenticationHash& hash, const string& token) {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(token, decoded_token);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error decoding JWT token received from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
+    return false;
+  }
+  if (FLAGS_jwt_validate_signature) {
+    status = JWTHelper::GetInstance()->Verify(decoded_token.get());
+    if (!status.ok()) {
+      LOG(ERROR) << "Error verifying JWT token received from: "
+                 << TNetworkAddressToString(connection_context->network_address)
+                 << " Error: " << status;
+      return false;
+    }
+  }
+
+  DCHECK(!FLAGS_jwt_custom_claim_username.empty());
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(
+      decoded_token.get(), FLAGS_jwt_custom_claim_username, username);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error extracting username from JWT token received from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
+    return false;
+  }
+  connection_context->username = username;
+  // TODO: cookies are not added, but are not needed right now
+  return true;
+}
+
 // Performs a step of SPNEGO auth for the HTTP transport and sets the username on
 // 'connection_context' if auth is successful. 'header_token' is the value from an
 // 'Authorization: Negotiate" header. Returns true if the step was successful and sets
@@ -1060,14 +1121,14 @@ Status SecureAuthProvider::Start() {
 Status SecureAuthProvider::GetServerTransportFactory(
     ThriftServer::TransportType underlying_transport_type, const std::string& server_name,
     MetricGroup* metrics, std::shared_ptr<TTransportFactory>* factory) {
-  DCHECK(!principal_.empty() || has_ldap_ || has_saml_);
+  DCHECK(!principal_.empty() || has_ldap_ || has_saml_ || has_jwt_);
 
   if (underlying_transport_type == ThriftServer::HTTP) {
     bool has_kerberos = !principal_.empty();
     bool use_cookies = FLAGS_max_cookie_lifetime_s > 0;
     bool check_trusted_domain = !FLAGS_trusted_domain.empty();
     factory->reset(new THttpServerTransportFactory(server_name, metrics, has_ldap_,
-        has_kerberos, use_cookies, check_trusted_domain, has_saml_));
+        has_kerberos, use_cookies, check_trusted_domain, has_saml_, has_jwt_));
     return Status::OK();
   }
 
@@ -1191,6 +1252,10 @@ void SecureAuthProvider::SetupConnectionContext(
         callbacks.validate_saml2_bearer_fn =
             std::bind(ValidateSaml2Bearer, connection_ptr.get(), hash_);
       }
+      if (has_jwt_) {
+        callbacks.jwt_token_auth_fn =
+            std::bind(JWTTokenAuth, connection_ptr.get(), hash_, std::placeholders::_1);
+      }
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
       socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
@@ -1286,6 +1351,20 @@ Status AuthManager::Init() {
     }
   }
 
+  bool use_jwt = FLAGS_jwt_token_auth;
+  if (use_jwt) {
+    if (!IsExternalTlsConfigured()) {
+      if (!FLAGS_jwt_allow_without_tls) {
+        return Status("JWT authentication should be only used with TLS enabled.");
+      }
+      LOG(WARNING) << "JWT authentication is used without TLS.";
+    }
+    if (FLAGS_jwt_custom_claim_username.empty()) {
+      return Status(
+          "JWT authentication requires jwt_custom_claim_username to be specified.");
+    }
+  }
+
   // Get all of the flag validation out of the way
   if (FLAGS_enable_ldap_auth) {
     RETURN_IF_ERROR(
@@ -1338,7 +1417,7 @@ Status AuthManager::Init() {
 
   if (IsInternalKerberosEnabled()) {
     // Initialize the auth provider first, in case validation of the principal fails.
-    SecureAuthProvider* sap = NULL;
+    SecureAuthProvider* sap = nullptr;
     internal_auth_provider_.reset(sap = new SecureAuthProvider(true));
     RETURN_IF_ERROR(sap->InitKerberos(kerberos_internal_principal));
     LOG(INFO) << "Internal communication is authenticated with Kerberos";
@@ -1352,7 +1431,7 @@ Status AuthManager::Init() {
   // principal or ldap tells us to use a SecureAuthProvider, and we fill in
   // details from there.
   if (FLAGS_enable_ldap_auth || external_kerberos_enabled) {
-    SecureAuthProvider* sap = NULL;
+    SecureAuthProvider* sap = nullptr;
     external_auth_provider_.reset(sap = new SecureAuthProvider(false));
     if (external_kerberos_enabled) {
       RETURN_IF_ERROR(sap->InitKerberos(kerberos_external_principal));
@@ -1366,15 +1445,25 @@ Status AuthManager::Init() {
       LOG(INFO) << "External communication can be also authenticated with SAML2 SSO";
       sap->InitSaml();
     }
+    if (use_jwt) {
+      LOG(INFO) << "External communication can be also authenticated with JWT";
+      sap->InitJwt();
+    }
   } else {
     external_auth_provider_.reset(new NoAuthProvider());
     LOG(INFO) << "External communication is not authenticated for binary protocols";
     if (use_saml) {
-      SecureAuthProvider* sap = NULL;
+      SecureAuthProvider* sap = nullptr;
       external_http_auth_provider_.reset(sap = new SecureAuthProvider(false));
       sap->InitSaml();
-      LOG(INFO) <<
-          "External communication is authenticated for hs2-http protocol with SAML2 SSO";
+      LOG(INFO) << "External communication is authenticated for hs2-http protocol with "
+                   "SAML2 SSO";
+    } else if (use_jwt) {
+      SecureAuthProvider* sap = nullptr;
+      external_http_auth_provider_.reset(sap = new SecureAuthProvider(false));
+      sap->InitJwt();
+      LOG(INFO)
+          << "External communication is authenticated for hs2-http protocol with JWT";
     } else {
       LOG(INFO) << "External communication is not authenticated for hs2-http protocol";
     }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a48f7fe..355b835 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -87,6 +87,7 @@
 #include "util/error-util.h"
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
+#include "util/jwt-util.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
@@ -343,6 +344,11 @@ DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
     "admission service, if enabled. Heartbeats are used to ensure resources are properly "
     "accounted for even if rpcs to the admission service occasionally fail.");
 
+// Flags for JWT token based authentication.
+DECLARE_bool(jwt_token_auth);
+DECLARE_bool(jwt_validate_signature);
+DECLARE_string(jwks_file_path);
+
 namespace {
 using namespace impala;
 
@@ -2869,6 +2875,17 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
     LOG(INFO) << "Initialized executor Impala server on "
               << TNetworkAddressToString(exec_env_->configured_backend_address());
   } else {
+    // Load JWKS from file if validation for signature of JWT token is enabled.
+    if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
+      if (!FLAGS_jwks_file_path.empty()) {
+        RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_file_path));
+      } else {
+        LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
+                   << " is enabled.";
+        return Status("JWKS file is not specified");
+      }
+    }
+
     // Initialize the client servers.
     shared_ptr<ImpalaServer> handler = shared_from_this();
     if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0)) {
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index af12105..f4f6e1e 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -46,12 +46,13 @@ using strings::Substitute;
 
 THttpServerTransportFactory::THttpServerTransportFactory(const std::string server_name,
     impala::MetricGroup* metrics, bool has_ldap, bool has_kerberos, bool use_cookies,
-    bool check_trusted_domain, bool has_saml)
+    bool check_trusted_domain, bool has_saml, bool has_jwt)
   : has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     use_cookies_(use_cookies),
     check_trusted_domain_(check_trusted_domain),
     has_saml_(has_saml),
+    has_jwt_(has_jwt),
     metrics_enabled_(metrics != nullptr) {
   if (metrics_enabled_) {
     if (has_ldap_) {
@@ -82,18 +83,25 @@ THttpServerTransportFactory::THttpServerTransportFactory(const std::string serve
       http_metrics_.total_saml_auth_failure_ =
           metrics->AddCounter(Substitute("$0.total-saml-auth-failure", server_name), 0);
     }
+    if (has_jwt_) {
+      http_metrics_.total_jwt_token_auth_success_ = metrics->AddCounter(
+          Substitute("$0.total-jwt-token-auth-success", server_name), 0);
+      http_metrics_.total_jwt_token_auth_failure_ = metrics->AddCounter(
+          Substitute("$0.total-jwt-token-auth-failure", server_name), 0);
+    }
   }
 }
 
 THttpServer::THttpServer(std::shared_ptr<TTransport> transport, bool has_ldap,
     bool has_kerberos, bool has_saml, bool use_cookies, bool check_trusted_domain,
-    bool metrics_enabled, HttpMetrics* http_metrics)
+    bool has_jwt, bool metrics_enabled, HttpMetrics* http_metrics)
   : THttpTransport(transport),
     has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     has_saml_(has_saml),
     use_cookies_(use_cookies),
     check_trusted_domain_(check_trusted_domain),
+    has_jwt_(has_jwt),
     metrics_enabled_(metrics_enabled),
     http_metrics_(http_metrics) {}
 
@@ -127,7 +135,7 @@ void THttpServer::parseHeader(char* header) {
     contentLength_ = atoi(value);
   } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
     origin_ = value;
-  } else if ((has_ldap_ || has_kerberos_ || has_saml_)
+  } else if ((has_ldap_ || has_kerberos_ || has_saml_ || has_jwt_)
       && THRIFT_strncasecmp(header, "Authorization", sz) == 0) {
     auth_value_ = string(value);
   } else if (use_cookies_ && THRIFT_strncasecmp(header, "Cookie", sz) == 0) {
@@ -209,7 +217,7 @@ bool THttpServer::parseStatusLine(char* status) {
 }
 
 void THttpServer::headersDone() {
-  if (!has_ldap_ && !has_kerberos_ && !has_saml_) {
+  if (!has_ldap_ && !has_kerberos_ && !has_saml_ && !has_jwt_) {
     // We don't need to authenticate.
     resetAuthState();
     return;
@@ -240,6 +248,25 @@ void THttpServer::headersDone() {
     }
   }
 
+  if (!authorized && has_jwt_ && !auth_value_.empty()
+      && auth_value_.find('.') != string::npos) {
+    // Check Authorization header with the Bearer authentication scheme as:
+    // Authorization: Bearer <token>
+    // JWT contains at least one period ('.'). A well-formed JWT consists of three
+    // concatenated Base64url-encoded strings, separated by dots (.).
+    StripWhiteSpace(&auth_value_);
+    string jwt_token;
+    bool got_bearer_auth = TryStripPrefixString(auth_value_, "Bearer ", &jwt_token);
+    if (got_bearer_auth) {
+      if (callbacks_.jwt_token_auth_fn(jwt_token)) {
+        authorized = true;
+        if (metrics_enabled_) http_metrics_->total_jwt_token_auth_success_->Increment(1);
+      } else {
+        if (metrics_enabled_) http_metrics_->total_jwt_token_auth_failure_->Increment(1);
+      }
+    }
+  }
+
   if (!authorized && has_saml_) {
     bool fallback_to_other_auths = true;
     if (saml_port_ != -1) {
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index 3c26e7e..3407d3e 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -54,6 +54,9 @@ struct HttpMetrics {
 
   impala::IntCounter* total_saml_auth_success_ = nullptr;
   impala::IntCounter* total_saml_auth_failure_ = nullptr;
+
+  impala::IntCounter* total_jwt_token_auth_success_ = nullptr;
+  impala::IntCounter* total_jwt_token_auth_failure_ = nullptr;
 };
 
 /*
@@ -119,11 +122,17 @@ public:
     // SAML2 SSO.
     std::function<impala::TWrappedHttpRequest*()> init_wrapped_http_request_fn =
         [&]() { return (impala::TWrappedHttpRequest*) NULL; };
+
+    // Function that takes the JWT token from the header, and returns true
+    // if verification for the token is successful.
+    std::function<bool(const std::string&)> jwt_token_auth_fn = [&](const std::string&) {
+      return false;
+    };
   };
 
   THttpServer(std::shared_ptr<TTransport> transport, bool has_ldap, bool has_kerberos,
-      bool has_saml, bool use_cookies, bool check_trusted_domain, bool metrics_enabled,
-      HttpMetrics* http_metrics);
+      bool has_saml, bool use_cookies, bool check_trusted_domain, bool has_jwt,
+      bool metrics_enabled, HttpMetrics* http_metrics);
 
   virtual ~THttpServer();
 
@@ -147,9 +156,9 @@ protected:
   void resetAuthState();
  private:
   // If either of the following is true, a '401 - Unauthorized' will be returned to the
-  // client on requests that do not contain a valid 'Authorization' of SAML SSO related
-  // header. If 'has_ldap_' is true, 'Basic' auth headers will be processed, and if
-  // 'has_kerberos_' is true 'Negotiate' auth headers will be processed.
+  // client on requests that do not contain a valid 'Authorization' of SAML SSO or JWT
+  // related header. If 'has_ldap_' is true, 'Basic' auth headers will be processed, and
+  // if 'has_kerberos_' is true 'Negotiate' auth headers will be processed.
   bool has_ldap_ = false;
   bool has_kerberos_ = false;
 
@@ -186,6 +195,9 @@ protected:
   // trusted domain.
   bool check_trusted_domain_ = false;
 
+  // If set, support for trusting an authentication based on JWT token.
+  bool has_jwt_ = false;
+
   bool metrics_enabled_ = false;
   HttpMetrics* http_metrics_ = nullptr;
 
@@ -203,13 +215,14 @@ public:
 
  THttpServerTransportFactory(const std::string server_name, impala::MetricGroup* metrics,
      bool has_ldap, bool has_kerberos, bool use_cookies, bool check_trusted_domain,
-     bool has_saml);
+     bool has_saml, bool has_jwt);
 
  virtual ~THttpServerTransportFactory() {}
 
  virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
-   return std::shared_ptr<TTransport>(new THttpServer(trans, has_ldap_, has_kerberos_,
-       has_saml_, use_cookies_, check_trusted_domain_, metrics_enabled_, &http_metrics_));
+   return std::shared_ptr<TTransport>(
+       new THttpServer(trans, has_ldap_, has_kerberos_, has_saml_, use_cookies_,
+           check_trusted_domain_, has_jwt_, metrics_enabled_, &http_metrics_));
   }
 
  private:
@@ -218,6 +231,7 @@ public:
   bool use_cookies_ = false;
   bool check_trusted_domain_ = false;
   bool has_saml_ = false;
+  bool has_jwt_ = false;
 
   // Metrics for every transport produced by this factory.
   bool metrics_enabled_ = false;
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 25aae66..168112a 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -63,6 +63,7 @@ set(UTIL_SRCS
   impala-bloom-filter-buffer-allocator.cc
   jni-util.cc
   json-util.cc
+  jwt-util.cc
   ldap-util.cc
   ldap-search-bind.cc
   ldap-simple-bind.cc
@@ -129,6 +130,7 @@ add_library(UtilTests STATIC
   fixed-size-hash-table-test.cc
   hdfs-util-test.cc
   hdr-histogram-test.cc
+  jwt-util-test.cc
   logging-support-test.cc
   metrics-test.cc
   min-max-filter-test.cc
@@ -198,6 +200,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdr-histogram-test HdrHistogramTest.*)
 # internal-queue-test has a non-standard main(), so it needs a small amount of thought
 # to use a unified executable
 ADD_BE_LSAN_TEST(internal-queue-test)
+ADD_UNIFIED_BE_LSAN_TEST(jwt-util-test "JwtUtilTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(logging-support-test "LoggingSupport.*")
 ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
diff --git a/be/src/util/jwt-util-internal.h b/be/src/util/jwt-util-internal.h
new file mode 100644
index 0000000..568326d
--- /dev/null
+++ b/be/src/util/jwt-util-internal.h
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_JWT_UTIL_INTERNAL_H
+#define IMPALA_JWT_UTIL_INTERNAL_H
+
+#include <string>
+#include <unordered_map>
+
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wreserved-id-macro"
+#pragma clang diagnostic ignored "-Wunused-private-field"
+// picojson/picojson.h which is included by jwt-cpp/jwt.h defines __STDC_FORMAT_MACROS
+// without checking if it's already defined so un-define the micro here to avoid
+// re-definition error. Also need to hide warning "macro name is a reserved identifier".
+#ifdef __STDC_FORMAT_MACROS
+#undef __STDC_FORMAT_MACROS
+#endif
+#include <jwt-cpp/jwt.h>
+#pragma clang diagnostic pop
+
+#include "common/logging.h"
+#include "common/status.h"
+
+namespace impala {
+
+using DecodedJWT = jwt::decoded_jwt<jwt::picojson_traits>;
+using JWTVerifier = jwt::verifier<jwt::default_clock, jwt::picojson_traits>;
+
+/// Key-Value map for parsing Json keys.
+typedef std::unordered_map<std::string, std::string> JsonKVMap;
+
+class JsonWebKeySet;
+
+/// JWTPublicKey:
+/// This class represent cryptographic public key for JSON Web Token (JWT) verification.
+class JWTPublicKey {
+ public:
+  JWTPublicKey(std::string algorithm, std::string pub_key)
+    : verifier_(jwt::verify()), algorithm_(algorithm), public_key_(pub_key) {}
+
+  /// Verify the given decoded token.
+  Status Verify(const DecodedJWT& decoded_jwt, const std::string& algorithm) const;
+
+  const std::string& get_algorithm() const { return algorithm_; }
+  const std::string& get_key() const { return public_key_; }
+
+ protected:
+  /// JWT Verifier.
+  JWTVerifier verifier_;
+
+ private:
+  /// Signing Algorithm:
+  /// Currently support following JSON Web Algorithms (JWA):
+  /// HS256, HS384, HS512, RS256, RS384, and RS512.
+  const std::string algorithm_;
+  /// Public key value:
+  /// For EC and RSA families of algorithms, it's the public key converted in PEM-encoded
+  /// format since jwt-cpp APIs only accept EC/RSA public keys in PEM-encoded format.
+  /// For HMAC-SHA2, it's Octet Sequence key representing secret key.
+  const std::string public_key_;
+};
+
+/// JWT Public Key for HS256.
+/// HS256: HMAC using SHA-256.
+class HS256JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  HS256JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs256(pub_key));
+  }
+};
+
+/// JWT Public Key for HS384.
+/// HS384: HMAC using SHA-384.
+class HS384JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw exception if failed to initialize the JWT verifier.
+  HS384JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs384(pub_key));
+  }
+};
+
+/// JWT Public Key for HS512.
+/// HS512: HMAC using SHA-512.
+class HS512JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  HS512JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs512(pub_key));
+  }
+};
+
+/// JWT Public Key for RS256.
+/// RS256: RSASSA-PKCS1-v1_5 using SHA-256.
+class RS256JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  RS256JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs256(pub_key, "", "", ""));
+  }
+};
+
+/// JWT Public Key for RS384.
+/// RS384: RSASSA-PKCS1-v1_5 using SHA-384.
+class RS384JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw exception if failed to initialize the JWT verifier.
+  RS384JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs384(pub_key, "", "", ""));
+  }
+};
+
+/// JWT Public Key for RS512.
+/// RS512: RSASSA-PKCS1-v1_5 using SHA-512.
+class RS512JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  RS512JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs512(pub_key, "", "", ""));
+  }
+};
+
+/// Construct a JWKPublicKey of HS from the JWK.
+class HSJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+};
+
+/// Construct a JWKPublicKey of RSA from the JWK.
+class RSAJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+
+ private:
+  /// Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL
+  /// APIs.
+  static bool ConvertJwkToPem(
+      const std::string& base64_n, const std::string& base64_e, std::string& pub_key);
+};
+
+/// JSON Web Key Set (JWKS) conveys the public keys used by the signing party to the
+/// clients that need to validate signatures. It represents a cryptographic key set in
+/// JSON data structure.
+/// This class works as JWT provider, which load the JWKS from file, store keys in an
+/// internal maps for each family of algorithms, and provides API to retrieve key by
+/// key-id.
+/// Init() should be called during the initialization of the daemon. There is no
+/// more modification for the instance after Init() return. The class is thread safe.
+class JsonWebKeySet {
+ public:
+  explicit JsonWebKeySet() {}
+
+  /// Map from a key ID (kid) to a JWTPublicKey.
+  typedef std::unordered_map<std::string, std::unique_ptr<JWTPublicKey>> JWTPublicKeyMap;
+
+  /// Load JWKS stored in a JSON file. Returns an error if problems were encountered
+  /// while parsing/constructing the Json Web keys. If no keys were given in the file,
+  /// the internal maps will be empty.
+  Status Init(const std::string& jwks_file_path);
+
+  /// Look up the key ID in the internal key maps and returns the key if the lookup was
+  /// successful, otherwise return nullptr.
+  const JWTPublicKey* LookupRSAPublicKey(const std::string& kid) const;
+  const JWTPublicKey* LookupHSKey(const std::string& kid) const;
+
+  /// Return number of keys for each family of algorithms.
+  int GetHSKeyNum() const { return hs_key_map_.size(); }
+  /// Return number of keys for RSA.
+  int GetRSAPublicKeyNum() const { return rsa_pub_key_map_.size(); }
+
+  /// Return all keys for HS.
+  const JWTPublicKeyMap* GetAllHSKeys() const { return &hs_key_map_; }
+  /// Return all keys for RSA.
+  const JWTPublicKeyMap* GetAllRSAPublicKeys() const { return &rsa_pub_key_map_; }
+
+  /// Return TRUE if there is no key.
+  bool IsEmpty() const { return hs_key_map_.empty() && rsa_pub_key_map_.empty(); }
+
+ private:
+  friend class JWKSetParser;
+
+  /// Following two functions are called inside Init().
+  /// Add a RSA public key.
+  void AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key);
+  /// Add a HS key.
+  void AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key);
+
+  /// Note: According to section 4.5 of RFC 7517 (JSON Web Key), different keys might use
+  /// the same "kid" value is if they have different "kty" (key type) values but are
+  /// considered to be equivalent alternatives by the application using them. So keys
+  /// for each "kty" are saved in different maps.
+
+  /// Octet Sequence keys for HS256 (HMAC using SHA-256), HS384 and HS512.
+  /// kty (key type): oct.
+  JWTPublicKeyMap hs_key_map_;
+  /// Public keys for RSA family of algorithms: RS256, RS384, RS512.
+  /// kty (key type): RSA.
+  JWTPublicKeyMap rsa_pub_key_map_;
+};
+
+} // namespace impala
+
+#endif
diff --git a/be/src/util/jwt-util-test.cc b/be/src/util/jwt-util-test.cc
new file mode 100644
index 0000000..c45db48
--- /dev/null
+++ b/be/src/util/jwt-util-test.cc
@@ -0,0 +1,702 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdio> // file stuff
+#include <gutil/strings/substitute.h>
+
+#include "jwt-util-internal.h"
+#include "jwt-util.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+using std::string;
+
+std::string rsa_priv_key_pem = R"(-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC4ZtdaIrd1BPIJ
+tfnF0TjIK5inQAXZ3XlCrUlJdP+XHwIRxdv1FsN12XyMYO/6ymLmo9ryoQeIrsXB
+XYqlET3zfAY+diwCb0HEsVvhisthwMU4gZQu6TYW2s9LnXZB5rVtcBK69hcSlA2k
+ZudMZWxZcj0L7KMfO2rIvaHw/qaVOE9j0T257Z8Kp2CLF9MUgX0ObhIsdumFRLaL
+DvDUmBPr2zuh/34j2XmWwn1yjN/WvGtdfhXW79Ki1S40HcWnygHgLV8sESFKUxxQ
+mKvPUTwDOIwLFL5WtE8Mz7N++kgmDcmWMCHc8kcOIu73Ta/3D4imW7VbKgHZo9+K
+3ESFE3RjAgMBAAECggEBAJTEIyjMqUT24G2FKiS1TiHvShBkTlQdoR5xvpZMlYbN
+tVWxUmrAGqCQ/TIjYnfpnzCDMLhdwT48Ab6mQJw69MfiXwc1PvwX1e9hRscGul36
+ryGPKIVQEBsQG/zc4/L2tZe8ut+qeaK7XuYrPp8bk/X1e9qK5m7j+JpKosNSLgJj
+NIbYsBkG2Mlq671irKYj2hVZeaBQmWmZxK4fw0Istz2WfN5nUKUeJhTwpR+JLUg4
+ELYYoB7EO0Cej9UBG30hbgu4RyXA+VbptJ+H042K5QJROUbtnLWuuWosZ5ATldwO
+u03dIXL0SH0ao5NcWBzxU4F2sBXZRGP2x/jiSLHcqoECgYEA4qD7mXQpu1b8XO8U
+6abpKloJCatSAHzjgdR2eRDRx5PMvloipfwqA77pnbjTUFajqWQgOXsDTCjcdQui
+wf5XAaWu+TeAVTytLQbSiTsBhrnoqVrr3RoyDQmdnwHT8aCMouOgcC5thP9vQ8Us
+rVdjvRRbnJpg3BeSNimH+u9AHgsCgYEA0EzcbOltCWPHRAY7B3Ge/AKBjBQr86Kv
+TdpTlxePBDVIlH+BM6oct2gaSZZoHbqPjbq5v7yf0fKVcXE4bSVgqfDJ/sZQu9Lp
+PTeV7wkk0OsAMKk7QukEpPno5q6tOTNnFecpUhVLLlqbfqkB2baYYwLJR3IRzboJ
+FQbLY93E8gkCgYB+zlC5VlQbbNqcLXJoImqItgQkkuW5PCgYdwcrSov2ve5r/Acz
+FNt1aRdSlx4176R3nXyibQA1Vw+ztiUFowiP9WLoM3PtPZwwe4bGHmwGNHPIfwVG
+m+exf9XgKKespYbLhc45tuC08DATnXoYK7O1EnUINSFJRS8cezSI5eHcbQKBgQDC
+PgqHXZ2aVftqCc1eAaxaIRQhRmY+CgUjumaczRFGwVFveP9I6Gdi+Kca3DE3F9Pq
+PKgejo0SwP5vDT+rOGHN14bmGJUMsX9i4MTmZUZ5s8s3lXh3ysfT+GAhTd6nKrIE
+kM3Nh6HWFhROptfc6BNusRh1kX/cspDplK5x8EpJ0QKBgQDWFg6S2je0KtbV5PYe
+RultUEe2C0jYMDQx+JYxbPmtcopvZQrFEur3WKVuLy5UAy7EBvwMnZwIG7OOohJb
+vkSpADK6VPn9lbqq7O8cTedEHttm6otmLt8ZyEl3hZMaL3hbuRj6ysjmoFKx6CrX
+rK0/Ikt5ybqUzKCMJZg2VKGTxg==
+-----END PRIVATE KEY-----)";
+std::string rsa_pub_key_pem = R"(-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuGbXWiK3dQTyCbX5xdE4
+yCuYp0AF2d15Qq1JSXT/lx8CEcXb9RbDddl8jGDv+spi5qPa8qEHiK7FwV2KpRE9
+83wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qea1bXASuvYXEpQNpGbnTGVs
+WXI9C+yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTFIF9Dm4SLHbphUS2iw7w1JgT
+69s7of9+I9l5lsJ9cozf1rxrXX4V1u/SotUuNB3Fp8oB4C1fLBEhSlMcUJirz1E8
+AziMCxS+VrRPDM+zfvpIJg3JljAh3PJHDiLu902v9w+Iplu1WyoB2aPfitxEhRN0
+YwIDAQAB
+-----END PUBLIC KEY-----)";
+std::string rsa_pub_key_jwk_n =
+    "uGbXWiK3dQTyCbX5xdE4yCuYp0AF2d15Qq1JSXT_lx8CEcXb9RbDddl8jGDv-sp"
+    "i5qPa8qEHiK7FwV2KpRE983wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qe"
+    "a1bXASuvYXEpQNpGbnTGVsWXI9C-yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTF"
+    "IF9Dm4SLHbphUS2iw7w1JgT69s7of9-I9l5lsJ9cozf1rxrXX4V1u_SotUuNB3F"
+    "p8oB4C1fLBEhSlMcUJirz1E8AziMCxS-VrRPDM-zfvpIJg3JljAh3PJHDiLu902"
+    "v9w-Iplu1WyoB2aPfitxEhRN0Yw";
+std::string rsa_pub_key_jwk_e = "AQAB";
+std::string rsa_invalid_pub_key_jwk_n =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+std::string rsa512_priv_key_pem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIICWwIBAAKBgQDdlatRjRjogo3WojgGHFHYLugdUWAY9iR3fy4arWNA1KoS8kVw
+33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQsHUfQrSDv+MuSUMAe8jzKE4qW
++jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5Do2kQ+X5xK9cipRgEKwIDAQAB
+AoGAD+onAtVye4ic7VR7V50DF9bOnwRwNXrARcDhq9LWNRrRGElESYYTQ6EbatXS
+3MCyjjX2eMhu/aF5YhXBwkppwxg+EOmXeh+MzL7Zh284OuPbkglAaGhV9bb6/5Cp
+uGb1esyPbYW+Ty2PC0GSZfIXkXs76jXAu9TOBvD0ybc2YlkCQQDywg2R/7t3Q2OE
+2+yo382CLJdrlSLVROWKwb4tb2PjhY4XAwV8d1vy0RenxTB+K5Mu57uVSTHtrMK0
+GAtFr833AkEA6avx20OHo61Yela/4k5kQDtjEf1N0LfI+BcWZtxsS3jDM3i1Hp0K
+Su5rsCPb8acJo5RO26gGVrfAsDcIXKC+bQJAZZ2XIpsitLyPpuiMOvBbzPavd4gY
+6Z8KWrfYzJoI/Q9FuBo6rKwl4BFoToD7WIUS+hpkagwWiz+6zLoX1dbOZwJACmH5
+fSSjAkLRi54PKJ8TFUeOP15h9sQzydI8zJU+upvDEKZsZc/UhT/SySDOxQ4G/523
+Y0sz/OZtSWcol/UMgQJALesy++GdvoIDLfJX5GBQpuFgFenRiRDabxrE9MNUZ2aP
+FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
+-----END RSA PRIVATE KEY-----)";
+std::string rsa512_pub_key_pem = R"(-----BEGIN PUBLIC KEY-----
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd
+UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs
+HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D
+o2kQ+X5xK9cipRgEKwIDAQAB
+-----END PUBLIC KEY-----)";
+std::string rsa512_pub_key_jwk_n =
+    "3ZWrUY0Y6IKN1qI4BhxR2C7oHVFgGPYkd38uGq1jQNSqEvJFcN93CYm16_G78FA"
+    "FKWqwsJb3Wx-nbxDn6LtP4AhULB1H0K0g7_jLklDAHvI8yhOKlvoyvsUFPWtNxl"
+    "Jyh5JJXvkNKV_4Oo12e69f8QCuQ6NpEPl-cSvXIqUYBCs";
+std::string rsa512_pub_key_jwk_e = "AQAB";
+std::string rsa512_invalid_pub_key_jwk_n =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+std::string kid_1 = "public:c424b67b-fe28-45d7-b015-f79da50b5b21";
+std::string kid_2 = "public:9b9d0b47-b9ed-4ba6-9180-52fc5b161a3a";
+
+std::string jwks_hs_file_format = R"(
+{
+  "keys": [
+    { "kty": "oct", "kid": "$0", "alg": "$1", "k": "$2" }
+  ]
+})";
+
+std::string jwks_rsa_file_format = R"(
+{
+  "keys": [
+    { "kty": "RSA", "kid": "$0", "alg": "$1", "n": "$2", "e": "$3" },
+    { "kty": "RSA", "kid": "$4", "alg": "$5", "n": "$6", "e": "$7" }
+  ]
+})";
+
+/// Utility class for creating a file that will be automatically deleted upon test
+/// completion.
+class TempTestDataFile {
+ public:
+  // Creates a temporary file with the specified contents.
+  TempTestDataFile(const std::string& contents);
+
+  ~TempTestDataFile() { Delete(); }
+
+  /// Returns the absolute path to the file.
+  const std::string& Filename() const { return name_; }
+
+ private:
+  std::string name_;
+  bool deleted_;
+
+  // Delete this temporary file
+  void Delete();
+};
+
+TempTestDataFile::TempTestDataFile(const std::string& contents)
+  : name_("/tmp/jwks_XXXXXX"), deleted_(false) {
+  int fd = mkstemp(&name_[0]);
+  if (fd == -1) {
+    std::cout << "Error creating temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  if (close(fd) != 0) {
+    std::cout << "Error closing temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+
+  FILE* handle = fopen(name_.c_str(), "w");
+  if (handle == nullptr) {
+    std::cout << "Error creating temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  int status = fputs(contents.c_str(), handle);
+  if (status < 0) {
+    std::cout << "Error writing to temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  status = fclose(handle);
+  if (status != 0) {
+    std::cout << "Error closing temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+}
+
+void TempTestDataFile::Delete() {
+  if (deleted_) return;
+  deleted_ = true;
+  if (remove(name_.c_str()) != 0) {
+    std::cout << "Error deleting temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+}
+
+TEST(JwtUtilTest, LoadJwksFile) {
+  // Load JWKS from file.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_FALSE(jwks->IsEmpty());
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ("rs256", key1->get_algorithm());
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  std::string non_existing_kid("public:c424b67b-fe28-45d7-b015-f79da5-xxxxx");
+  const JWTPublicKey* key3 = jwks->LookupRSAPublicKey(non_existing_kid);
+  ASSERT_FALSE(key3 != nullptr);
+}
+
+TEST(JwtUtilTest, LoadInvalidJwksFiles) {
+  // JWK without kid.
+  std::unique_ptr<TempTestDataFile> jwks_file(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "    }"
+      "  ]"
+      "}"));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'kid' property is required") != std::string::npos)
+      << "actual error: " << status.GetDetail();
+  ASSERT_TRUE(jwt_helper.GetJWKS()->IsEmpty());
+
+  // Invalid JSON format, missing "]" and "}".
+  jwks_file.reset(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"kid\": \"public:c424b67b-fe28-45d7-b015-f79da50b5b21\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "}"));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Missing a comma or ']' after an array element")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+
+  // JWKS with empty key id.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(jwks_rsa_file_format, "", "RS256", rsa_pub_key_jwk_n, rsa_pub_key_jwk_e,
+          "", "RS256", rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e)));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'kid' property must be a non-empty string")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+
+  // JWKS with empty key value.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(jwks_rsa_file_format, kid_1, "RS256", "", "", kid_2, "RS256", "", "")));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'n' and 'e' properties must be a non-empty string")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtHS256) {
+  // Cryptographic algorithm: HS256.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "Yx57JSBzhGFDgDj19CabRpH/+kiaKqI6UZI6lDunQKw=";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS256", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS256.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS256")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs256(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS384) {
+  // Cryptographic algorithm: HS384.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret =
+      "TlqmKRc2PNQJXTC3Go7eAadwPxA7x9byyXCi5I8tSvxrE77tYbuF5pfZAyswrkou";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS384", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS384.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS384")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs384(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS512) {
+  // Cryptographic algorithm: HS512.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "ywc6DN7+iRw1E5HOqzvrsYodykSLFutT28KN3bJnLZcZpPCNjn0b6gbMfXPcxeY"
+                         "VyuWWGDxh6gCDwPMejbuEEg==";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS512", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS512.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS512")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs512(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS256) {
+  // Cryptographic algorithm: RS256.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  ASSERT_EQ(
+      "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1mNzlkYTUwYj"
+      "ViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoiaW1wYWxhIn0.OW5H2SClL"
+      "lsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI02W69TZNat7DYcC86A_ogLMfNXagHj"
+      "lMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU554kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2"
+      "y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgcbZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO"
+      "25b5n4fryfKasSgaZfmk0CoOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKs"
+      "Pk0bg",
+      token);
+
+  // Verify the JWT token with jwt-cpp APIs directly.
+  auto jwt_decoded_token = jwt::decode(token);
+  auto verifier = jwt::verify()
+                      .allow_algorithm(jwt::algorithm::rs256(rsa_pub_key_pem, "", "", ""))
+                      .with_issuer("auth0");
+  verifier.verify(jwt_decoded_token);
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS384) {
+  // Cryptographic algorithm: RS384.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS384",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS384", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS384.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS384")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs384(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS512) {
+  // Cryptographic algorithm: RS512.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS512",
+      rsa512_pub_key_jwk_n, rsa512_pub_key_jwk_e, kid_2, "RS512",
+      rsa512_invalid_pub_key_jwk_n, rsa512_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa512_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS512.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs512(rsa512_pub_key_pem, rsa512_priv_key_pem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtNotVerifySignature) {
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Do not verify signature.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailMismatchingAlgorithms) {
+  // JWT algorithm is not matching with algorithm in JWK.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token, but set mismatching algorithm.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kid_1)
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Failed to verify the token due to mismatching algorithms.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find(
+                  "JWT algorithm 'rs512' is not matching with JWK algorithm 'rs256'")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailKeyNotFound) {
+  // The key cannot be found in JWKS.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token with a key ID which can not be found in JWKS.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id("unfound-key-id")
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Failed to verify the token since key is not found in JWKS.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(
+      status.GetDetail().find("Invalid JWK ID in the JWT token") != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS256").sign(
+          jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Verify the token by trying each key in JWK set and there is one matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS512").sign(
+          jwt::algorithm::rs512(rsa512_pub_key_pem, rsa512_priv_key_pem, "", ""));
+  // Verify the token by trying each key in JWK set, but there is no matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutSignature) {
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without signature.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").sign(jwt::algorithm::none{});
+  // Failed to verify the unsigned token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Unsecured JWT") != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailExpiredToken) {
+  // Sign JWT token with RS256.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kid_1)
+          .set_issued_at(std::chrono::system_clock::now())
+          .set_expires_at(std::chrono::system_clock::now() - std::chrono::seconds{10})
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Verify the token, including expiring time.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Verification failed, error: token expired")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+} // namespace impala
diff --git a/be/src/util/jwt-util.cc b/be/src/util/jwt-util.cc
new file mode 100644
index 0000000..27c4a68
--- /dev/null
+++ b/be/src/util/jwt-util.cc
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string.h>
+#include <cerrno>
+#include <ostream>
+#include <unordered_map>
+#include <vector>
+#include <sys/stat.h>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/escaping.h>
+#include <gutil/strings/substitute.h>
+#include <openssl/bio.h>
+#include <openssl/ec.h>
+#include <openssl/evp.h>
+#include <openssl/pem.h>
+#include <openssl/rsa.h>
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/filereadstream.h>
+
+#include "common/names.h"
+#include "jwt-util-internal.h"
+#include "jwt-util.h"
+
+namespace impala {
+
+using rapidjson::Document;
+using rapidjson::Value;
+
+// JWK Set (JSON Web Key Set) is JSON data structure that represents a set of JWKs.
+// This class parses JWKS file.
+class JWKSetParser {
+ public:
+  JWKSetParser(JsonWebKeySet* jwks) : jwks_(jwks) {}
+
+  // Perform the parsing and populate JWKS's internal map. Return error status if
+  // encountering any error.
+  Status Parse(const Document& rules_doc) {
+    bool found_keys = false;
+    for (Value::ConstMemberIterator member = rules_doc.MemberBegin();
+         member != rules_doc.MemberEnd(); ++member) {
+      if (strcmp("keys", member->name.GetString()) == 0) {
+        found_keys = true;
+        RETURN_IF_ERROR(ParseKeys(member->value));
+      } else {
+        return Status(TErrorCode::JWKS_PARSE_ERROR,
+            Substitute(
+                "Unexpected property '$0' must be removed", member->name.GetString()));
+      }
+    }
+    if (!found_keys) {
+      return Status(TErrorCode::JWKS_PARSE_ERROR, "An array of keys is required");
+    }
+    return Status::OK();
+  }
+
+ private:
+  JsonWebKeySet* jwks_;
+
+  string NameOfTypeOfJsonValue(const Value& value) {
+    switch (value.GetType()) {
+      case rapidjson::kNullType:
+        return "Null";
+      case rapidjson::kFalseType:
+      case rapidjson::kTrueType:
+        return "Bool";
+      case rapidjson::kObjectType:
+        return "Object";
+      case rapidjson::kArrayType:
+        return "Array";
+      case rapidjson::kStringType:
+        return "String";
+      case rapidjson::kNumberType:
+        if (value.IsInt()) return "Integer";
+        if (value.IsDouble()) return "Float";
+      default:
+        DCHECK(false);
+        return "Unknown";
+    }
+  }
+
+  // Parse an array of keys.
+  Status ParseKeys(const Value& keys) {
+    if (!keys.IsArray()) {
+      return Status(TErrorCode::JWKS_PARSE_ERROR,
+          Substitute(
+              "'keys' must be of type Array but is a '$0'", NameOfTypeOfJsonValue(keys)));
+    } else if (keys.Size() == 0) {
+      return Status(
+          TErrorCode::JWKS_PARSE_ERROR, Substitute("'keys' must be a non empty Array"));
+    }
+    for (rapidjson::SizeType key_idx = 0; key_idx < keys.Size(); ++key_idx) {
+      const Value& key = keys[key_idx];
+      if (!key.IsObject()) {
+        return Status(TErrorCode::JWKS_PARSE_ERROR,
+            Substitute("parsing key #$0, key should be a JSON Object but is a '$1'.",
+                key_idx, NameOfTypeOfJsonValue(key)));
+      }
+      Status status = ParseKey(key);
+      if (!status.ok()) {
+        Status parse_status(
+            TErrorCode::JWKS_PARSE_ERROR, Substitute("parsing key #$0, ", key_idx));
+        parse_status.MergeStatus(status);
+        return parse_status;
+      }
+    }
+    return Status::OK();
+  }
+
+  // Parse a public key and populate JWKS's internal map.
+  Status ParseKey(const Value& json_key) {
+    std::unordered_map<std::string, std::string> kv_map;
+    string k, v;
+    for (Value::ConstMemberIterator member = json_key.MemberBegin();
+         member != json_key.MemberEnd(); ++member) {
+      k = string(member->name.GetString());
+      RETURN_IF_ERROR(ReadKeyProperty(k.c_str(), json_key, &v, /*required*/ false));
+      if (kv_map.find(k) == kv_map.end()) {
+        kv_map.insert(make_pair(k, v));
+      } else {
+        LOG(WARNING) << "Duplicate property of JWK: " << k;
+      }
+    }
+
+    auto it_kty = kv_map.find("kty");
+    if (it_kty == kv_map.end()) return Status("'kty' property is required");
+    auto it_kid = kv_map.find("kid");
+    if (it_kid == kv_map.end()) return Status("'kid' property is required");
+    string key_id = it_kid->second;
+    if (key_id.empty()) {
+      return Status(Substitute("'kid' property must be a non-empty string"));
+    }
+
+    Status status;
+    string key_type = boost::algorithm::to_lower_copy(it_kty->second);
+    if (key_type.compare("oct") == 0) {
+      JWTPublicKey* jwt_pub_key;
+      status = HSJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddHSKey(key_id, jwt_pub_key);
+    } else if (key_type.compare("rsa") == 0) {
+      JWTPublicKey* jwt_pub_key;
+      status = RSAJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddRSAPublicKey(key_id, jwt_pub_key);
+    } else {
+      return Status(Substitute("Unsupported kty: '$0'", key_type));
+    }
+    return status;
+  }
+
+  // Reads a key property of the given name and assigns the property value to the out
+  // parameter. A true return value indicates success.
+  template <typename T>
+  Status ReadKeyProperty(
+      const string& name, const Value& json_key, T* value, bool required = true) {
+    const Value& json_value = json_key[name.c_str()];
+    if (json_value.IsNull()) {
+      if (required) {
+        return Status(Substitute("'$0' property is required and cannot be null", name));
+      } else {
+        return Status::OK();
+      }
+    }
+    return ValidateTypeAndExtractValue(name, json_value, value);
+  }
+
+// Extract a value stored in a rapidjson::Value and assign it to the out parameter.
+// The type will be validated before extraction. A true return value indicates success.
+// The name parameter is only used to generate an error message upon failure.
+#define EXTRACT_VALUE(json_type, cpp_type)                                             \
+  Status ValidateTypeAndExtractValue(                                                  \
+      const string& name, const Value& json_value, cpp_type* value) {                  \
+    if (!json_value.Is##json_type()) {                                                 \
+      return Status(                                                                   \
+          Substitute("'$0' property must be of type " #json_type " but is a $1", name, \
+              NameOfTypeOfJsonValue(json_value)));                                     \
+    }                                                                                  \
+    *value = json_value.Get##json_type();                                              \
+    return Status::OK();                                                               \
+  }
+
+  EXTRACT_VALUE(String, string)
+  // EXTRACT_VALUE(Bool, bool)
+};
+
+//
+// JWTPublicKey member functions.
+//
+// Verify JWT's signature for the given decoded token with jwt-cpp API.
+Status JWTPublicKey::Verify(
+    const DecodedJWT& decoded_jwt, const std::string& algorithm) const {
+  // Verify if algorithms are matching.
+  if (algorithm_.compare(algorithm) != 0) {
+    return Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("JWT algorithm '$0' is not matching with JWK algorithm '$1'",
+            algorithm, algorithm_));
+  }
+
+  Status status;
+  try {
+    // Call jwt-cpp API to verify token's signature.
+    verifier_.verify(decoded_jwt);
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED, Substitute("RSA error: $0", e.what()));
+  } catch (const jwt::error::token_verification_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Verification failed, error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Varification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Create a JWKPublicKey of HS from the JWK.
+Status HSJWTPublicKeyBuilder::CreateJWKPublicKey(
+    JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // Octet Sequence keys for HS256, HS384 or HS512.
+  // JWK Sample:
+  // {
+  //   "kty":"oct",
+  //   "alg":"HS256",
+  //   "k":"f83OJ3D2xF1Bg8vub9tLe1gHMzV76e8Tus9uPHvRVEU",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status("'alg' property is required");
+  string algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+  if (algorithm.empty()) {
+    return Status(Substitute("'alg' property must be a non-empty string"));
+  }
+  auto it_k = kv_map.find("k");
+  if (it_k == kv_map.end()) return Status("'k' property is required");
+  if (it_k->second.empty()) {
+    return Status(Substitute("'k' property must be a non-empty string"));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm.compare("hs256") == 0) {
+      jwt_pub_key = new HS256JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm.compare("hs384") == 0) {
+      jwt_pub_key = new HS384JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm.compare("hs512") == 0) {
+      jwt_pub_key = new HS512JWTPublicKey(algorithm, it_k->second);
+    } else {
+      return Status(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Create a JWKPublicKey of RSA from the JWK.
+Status RSAJWTPublicKeyBuilder::CreateJWKPublicKey(
+    JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // JWK Sample:
+  // {
+  //   "kty":"RSA",
+  //   "alg":"RS256",
+  //   "n":"sttddbg-_yjXzcFpbMJB1fI9...Q_QDhvqXx8eQ1r9smM",
+  //   "e":"AQAB",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status("'alg' property is required");
+  string algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+  if (algorithm.empty()) {
+    return Status(Substitute("'alg' property must be a non-empty string"));
+  }
+
+  auto it_n = kv_map.find("n");
+  auto it_e = kv_map.find("e");
+  if (it_n == kv_map.end() || it_e == kv_map.end()) {
+    return Status("'n' and 'e' properties are required");
+  } else if (it_n->second.empty() || it_e->second.empty()) {
+    return Status("'n' and 'e' properties must be a non-empty string");
+  }
+  // Converts public key to PEM encoded form.
+  string pub_key;
+  if (!ConvertJwkToPem(it_n->second, it_e->second, pub_key)) {
+    return Status(
+        Substitute("Invalid public key 'n':'$0', 'e':'$1'", it_n->second, it_e->second));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm.compare("rs256") == 0) {
+      jwt_pub_key = new RS256JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm.compare("rs384") == 0) {
+      jwt_pub_key = new RS384JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm.compare("rs512") == 0) {
+      jwt_pub_key = new RS512JWTPublicKey(algorithm, pub_key);
+    } else {
+      return Status(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED, Substitute("RSA error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL APIs.
+bool RSAJWTPublicKeyBuilder::ConvertJwkToPem(
+    const std::string& base64_n, const std::string& base64_e, std::string& pub_key) {
+  pub_key.clear();
+  string str_n, str_e;
+  if (!WebSafeBase64Unescape(base64_n, &str_n)) return false;
+  if (!WebSafeBase64Unescape(base64_e, &str_e)) return false;
+  BIGNUM* modul = BN_bin2bn((const unsigned char*)str_n.c_str(), str_n.size(), nullptr);
+  BIGNUM* expon = BN_bin2bn((const unsigned char*)str_e.c_str(), str_e.size(), nullptr);
+
+  RSA* rsa = RSA_new();
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  rsa->n = modul;
+  rsa->e = expon;
+#else
+  // RSA_set0_key is a new API introduced in OpenSSL version 1.1
+  RSA_set0_key(rsa, modul, expon, nullptr);
+#endif
+
+  unsigned char desc[1024];
+  memset(desc, 0, 1024);
+  BIO* bio = BIO_new(BIO_s_mem());
+  PEM_write_bio_RSA_PUBKEY(bio, rsa);
+  if (BIO_read(bio, desc, 1024) > 0) {
+    pub_key = (char*)desc;
+    // Remove last '\n'.
+    if (pub_key.length() > 0 && pub_key[pub_key.length() - 1] == '\n') pub_key.pop_back();
+  }
+  BIO_free(bio);
+  RSA_free(rsa);
+  if (pub_key.empty()) return false;
+  return true;
+}
+
+//
+// JsonWebKeySet member functions.
+//
+
+Status JsonWebKeySet::Init(const string& jwks_file_path) {
+  hs_key_map_.clear();
+  rsa_pub_key_map_.clear();
+
+  // Read the file.
+  FILE* jwks_file = fopen(jwks_file_path.c_str(), "r");
+  if (jwks_file == nullptr) {
+    return Status(
+        Substitute("Could not open JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  // Check for an empty file and ignore it.
+  struct stat jwks_file_stats;
+  if (fstat(fileno(jwks_file), &jwks_file_stats)) {
+    fclose(jwks_file);
+    return Status(
+        Substitute("Error reading JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  if (jwks_file_stats.st_size == 0) {
+    fclose(jwks_file);
+    return Status::OK();
+  }
+
+  char readBuffer[65536];
+  rapidjson::FileReadStream stream(jwks_file, readBuffer, sizeof(readBuffer));
+  Document jwks_doc;
+  jwks_doc.ParseStream(stream);
+  fclose(jwks_file);
+  if (jwks_doc.HasParseError()) {
+    return Status(
+        TErrorCode::JWKS_PARSE_ERROR, GetParseError_En(jwks_doc.GetParseError()));
+  }
+  if (!jwks_doc.IsObject()) {
+    return Status(TErrorCode::JWKS_PARSE_ERROR, "root element must be a JSON Object");
+  }
+  if (!jwks_doc.HasMember("keys")) {
+    return Status(TErrorCode::JWKS_PARSE_ERROR, "keys is required");
+  }
+
+  JWKSetParser jwks_parser(this);
+  return jwks_parser.Parse(jwks_doc);
+}
+
+void JsonWebKeySet::AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
+  if (hs_key_map_.find(key_id) == hs_key_map_.end()) {
+    hs_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for HS key: " << key_id;
+  }
+}
+
+void JsonWebKeySet::AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
+  if (rsa_pub_key_map_.find(key_id) == rsa_pub_key_map_.end()) {
+    rsa_pub_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for RSA public key: " << key_id;
+  }
+}
+
+const JWTPublicKey* JsonWebKeySet::LookupHSKey(const std::string& kid) const {
+  auto find_it = hs_key_map_.find(kid);
+  if (find_it == hs_key_map_.end()) {
+    // Could not find key for the given key ID.
+    return nullptr;
+  }
+  return find_it->second.get();
+}
+
+const JWTPublicKey* JsonWebKeySet::LookupRSAPublicKey(const std::string& kid) const {
+  auto find_it = rsa_pub_key_map_.find(kid);
+  if (find_it == rsa_pub_key_map_.end()) {
+    // Could not find key for the given key ID.
+    return nullptr;
+  }
+  return find_it->second.get();
+}
+
+//
+// JWTHelper member functions.
+//
+
+struct JWTHelper::JWTDecodedToken {
+  JWTDecodedToken(const DecodedJWT& decoded_jwt) : decoded_jwt_(decoded_jwt) {}
+  DecodedJWT decoded_jwt_;
+};
+
+JWTHelper* JWTHelper::jwt_helper_ = new JWTHelper();
+
+void JWTHelper::TokenDeleter::operator()(JWTHelper::JWTDecodedToken* token) const {
+  if (token != nullptr) delete token;
+};
+
+Status JWTHelper::Init(const std::string& jwks_file_path) {
+  jwks_.reset(new JsonWebKeySet());
+  RETURN_IF_ERROR(jwks_->Init(jwks_file_path));
+  if (jwks_->IsEmpty()) LOG(WARNING) << "JWKS file is empty.";
+  initialized_ = true;
+  return Status::OK();
+}
+
+// Decode the given JWT token.
+Status JWTHelper::Decode(const string& token, UniqueJWTDecodedToken& decoded_token_out) {
+  Status status;
+  try {
+    // Call jwt-cpp API to decode the JWT token with default jwt::json_traits
+    // (jwt::picojson_traits).
+    decoded_token_out.reset(new JWTDecodedToken(jwt::decode(token)));
+#ifndef NDEBUG
+    std::stringstream msg;
+    msg << "JWT token header: ";
+    for (auto& e : decoded_token_out.get()->decoded_jwt_.get_header_claims()) {
+      msg << e.first << "=" << e.second.to_json().serialize() << ";";
+    }
+    msg << " JWT token payload: ";
+    for (auto& e : decoded_token_out.get()->decoded_jwt_.get_payload_claims()) {
+      msg << e.first << "=" << e.second.to_json().serialize() << ";";
+    }
+    VLOG(3) << msg.str();
+#endif
+  } catch (const std::invalid_argument& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Token is not in correct format, error: $0", e.what()));
+  } catch (const std::runtime_error& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Base64 decoding failed or invalid json, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Validate the token's signature with public key.
+Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
+  DCHECK(initialized_);
+  DCHECK(decoded_token != nullptr);
+
+  if (decoded_token->decoded_jwt_.get_signature().empty()) {
+    // Don't accept JWT without a signature.
+    return Status(TErrorCode::JWT_VERIFY_FAILED, "Unsecured JWT");
+  } else if (jwks_ == nullptr) {
+    // Skip to signature validation if there is no public key.
+    return Status::OK();
+  }
+
+  Status status;
+  try {
+    string algorithm =
+        boost::algorithm::to_lower_copy(decoded_token->decoded_jwt_.get_algorithm());
+    if (decoded_token->decoded_jwt_.has_key_id()) {
+      // Get key id from token's header and use it to retrieve the public key from JWKS.
+      std::string key_id = decoded_token->decoded_jwt_.get_key_id();
+
+      const JWTPublicKey* pub_key = nullptr;
+      if (algorithm.substr(0, 2).compare("hs") == 0) {
+        pub_key = jwks_->LookupHSKey(key_id);
+      } else if (algorithm.substr(0, 2).compare("rs") == 0) {
+        pub_key = jwks_->LookupRSAPublicKey(key_id);
+      } else {
+        return Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (pub_key == nullptr) {
+        return Status(TErrorCode::JWT_VERIFY_FAILED, "Invalid JWK ID in the JWT token");
+      }
+      // Use the public key to verify the token's signature.
+      status = pub_key->Verify(decoded_token->decoded_jwt_, algorithm);
+    } else {
+      // According to RFC 7517 (JSON Web Key), 'kid' is OPTIONAL so it's possible there
+      // is no key id in the token's header. In this case, get all of public keys from
+      // JWKS for the family of algorithms.
+      const JsonWebKeySet::JWTPublicKeyMap* key_map = nullptr;
+      if (algorithm.substr(0, 2).compare("hs") == 0) {
+        key_map = jwks_->GetAllHSKeys();
+      } else if (algorithm.substr(0, 2).compare("rs") == 0) {
+        key_map = jwks_->GetAllRSAPublicKeys();
+      } else {
+        return Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (key_map->size() == 0) {
+        return Status(
+            TErrorCode::JWT_VERIFY_FAILED, "Verification failed, no matching key");
+      }
+      // Try each key with matching algorithm util the signature is verified.
+      for (auto& key : *key_map) {
+        status = key.second->Verify(decoded_token->decoded_jwt_, algorithm);
+        if (status.ok()) return status;
+      }
+    }
+  } catch (const std::bad_cast& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim was present but not a string, error: $0", e.what()));
+  } catch (const jwt::error::claim_not_present_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim not present in JWT token, error $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Token varification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+    const string& jwt_custom_claim_username, string& username) {
+  DCHECK(decoded_token != nullptr);
+  DCHECK(!jwt_custom_claim_username.empty());
+  Status status;
+  try {
+    // Get value of custom claim 'username' from the token payload.
+    if (decoded_token->decoded_jwt_.has_payload_claim(jwt_custom_claim_username)) {
+      // Assume the claim data type of 'username' is string.
+      username.assign(
+          decoded_token->decoded_jwt_.get_payload_claim(jwt_custom_claim_username)
+              .to_json()
+              .to_str());
+      if (username.empty()) {
+        status = Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Claim '$0' is empty", jwt_custom_claim_username));
+      }
+    } else {
+      status = Status(TErrorCode::JWT_VERIFY_FAILED,
+          Substitute("Claim '$0' was not present", jwt_custom_claim_username));
+    }
+  } catch (const std::runtime_error& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim '$0' was not present, error: $1", jwt_custom_claim_username,
+            e.what()));
+  }
+  return status;
+}
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/jwt-util.h b/be/src/util/jwt-util.h
new file mode 100644
index 0000000..92e4468
--- /dev/null
+++ b/be/src/util/jwt-util.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_JWT_UTIL_H
+#define IMPALA_JWT_UTIL_H
+
+#include <string>
+
+#include "common/logging.h"
+#include "common/status.h"
+
+namespace impala {
+
+class JsonWebKeySet;
+
+/// JSON Web Token (JWT) is an Internet proposed standard for creating data with optional
+/// signature and/or optional encryption whose payload holds JSON that asserts some
+/// number of claims. The tokens are signed either using a private secret or a public/
+/// private key.
+/// This class works as wrapper for jwt-cpp. It provides APIs to decode/verify JWT token,
+/// and extracts custom claim from the payload of JWT token.
+/// JsonWebKeySet is read only after Init() is called. The class is thread safe.
+class JWTHelper {
+ public:
+  /// Opaque types for storing the JWT decoded token. This allows us to avoid including
+  /// header file jwt-cpp/jwt.h.
+  struct JWTDecodedToken;
+
+  // Custom deleter: intended for use with std::unique_ptr<JWTDecodedToken>.
+  class TokenDeleter {
+   public:
+    /// Called by unique_ptr to free JWTDecodedToken
+    void operator()(JWTHelper::JWTDecodedToken* token) const;
+  };
+  /// UniqueJWTDecodedToken -- a wrapper around opaque decoded token structure to
+  /// facilitate automatic reference counting.
+  typedef std::unique_ptr<JWTDecodedToken, TokenDeleter> UniqueJWTDecodedToken;
+
+  /// Return the single instance.
+  static JWTHelper* GetInstance() { return jwt_helper_; }
+
+  /// Load JWKS from a JSON file. Returns an error if problems were encountered.
+  Status Init(const std::string& jwks_file_path);
+
+  /// Decode the given JWT token. The decoding result is stored in decoded_token_.
+  /// Return Status::OK if the decoding is successful.
+  static Status Decode(
+      const std::string& token, UniqueJWTDecodedToken& decoded_token_out);
+
+  /// Verify the token's signature with the given JWKS. The token should be already
+  /// decoded by calling Decode().
+  /// Return Status::OK if the verification is successful.
+  Status Verify(const JWTDecodedToken* decoded_token) const;
+
+  /// Extract custom claim "Username" from from the payload of the decoded JWT token.
+  /// Return Status::OK if the extraction is successful.
+  static Status GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+      const std::string& custom_claim_username, std::string& username);
+
+  /// Return Json Web Key Set. It's called only by unit-test code.
+  const JsonWebKeySet* GetJWKS() const { return jwks_.get(); }
+
+ private:
+  /// Single instance.
+  static JWTHelper* jwt_helper_;
+
+  /// Set it as TRUE when Init() is called.
+  bool initialized_ = false;
+
+  /// Json Web Key Set (JWKS) for Json Web Token (JWT) verification.
+  /// Only one instance per daemon.
+  std::unique_ptr<JsonWebKeySet> jwks_;
+};
+
+} // namespace impala
+
+#endif
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 74f5c3c..5a6c1e2 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -41,8 +41,8 @@
 #include "kudu/util/env.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/sockaddr.h"
-#include "rpc/authentication.h"
 #include "rpc/authentication-util.h"
+#include "rpc/authentication.h"
 #include "rpc/thrift-util.h"
 #include "runtime/exec-env.h"
 #include "service/impala-server.h"
@@ -52,6 +52,7 @@
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
+#include "util/jwt-util.h"
 #include "util/mem-info.h"
 #include "util/metrics.h"
 #include "util/os-info.h"
@@ -150,6 +151,9 @@ DECLARE_string(ssl_minimum_version);
 DECLARE_string(ssl_cipher_list);
 DECLARE_string(trusted_domain);
 DECLARE_bool(trusted_domain_use_xff_header);
+DECLARE_bool(jwt_token_auth);
+DECLARE_bool(jwt_validate_signature);
+DECLARE_string(jwt_custom_claim_username);
 
 static const char* DOC_FOLDER = "/www/";
 static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
@@ -282,7 +286,8 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     error_handler_(UrlHandler(
         bind<void>(&Webserver::ErrorHandler, this, _1, _2), "error.tmpl", false)),
     use_cookies_(FLAGS_max_cookie_lifetime_s > 0),
-    check_trusted_domain_(!FLAGS_trusted_domain.empty()) {
+    check_trusted_domain_(!FLAGS_trusted_domain.empty()),
+    use_jwt_(FLAGS_jwt_token_auth) {
   http_address_ = MakeNetworkAddress(interface.empty() ? "0.0.0.0" : interface, port);
   Init();
 
@@ -309,6 +314,12 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     total_trusted_domain_check_success_ =
         metrics->AddCounter("impala.webserver.total-trusted-domain-check-success", 0);
   }
+  if (use_jwt_) {
+    total_jwt_token_auth_success_ =
+        metrics->AddCounter("impala.webserver.total-jwt-token-auth-success", 0);
+    total_jwt_token_auth_failure_ =
+        metrics->AddCounter("impala.webserver.total-jwt-token-auth-failure", 0);
+  }
 }
 
 Webserver::~Webserver() {
@@ -600,8 +611,36 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
   }
 
   vector<string> response_headers;
-  bool authenticated = auth_mode_ != AuthMode::SPNEGO && auth_mode_ != AuthMode::LDAP;
-  // Try authenticating with a cookie first, if enabled.
+  bool authenticated = false;
+  // Try authenticating with JWT token first, if enabled.
+  if (use_jwt_) {
+    const char* auth_value = nullptr;
+    const char* value = sq_get_header(connection, "Authorization");
+    if (value != nullptr) auth_value = StripLeadingWhiteSpace(value);
+    // Check Authorization header with the Bearer authentication scheme as:
+    // Authorization: Bearer <token>
+    // A well-formed JWT consists of three concatenated Base64url-encoded strings,
+    // separated by dots (.).
+    if (auth_value != nullptr && strncasecmp(auth_value, "Bearer ", 7) == 0
+        && strchr(auth_value, '.') != nullptr) {
+      string jwt_token = string(auth_value + 7);
+      StripWhiteSpace(&jwt_token);
+      if (!jwt_token.empty()) {
+        if (JWTTokenAuth(jwt_token, connection, request_info)) {
+          total_jwt_token_auth_success_->Increment(1);
+          authenticated = true;
+          // TODO: cookies are not added, but are not needed right now
+        } else {
+          LOG(INFO) << "Invalid JWT token provided: " << jwt_token;
+          total_jwt_token_auth_failure_->Increment(1);
+        }
+      }
+    }
+  }
+  if (!authenticated) {
+    authenticated = auth_mode_ != AuthMode::SPNEGO && auth_mode_ != AuthMode::LDAP;
+  }
+  // Try authenticating with a cookie, if enabled.
   if (!authenticated && use_cookies_) {
     const char* cookie_header = sq_get_header(connection, "Cookie");
     string username;
@@ -832,6 +871,37 @@ bool Webserver::TrustedDomainCheck(const string& origin, struct sq_connection* c
   return true;
 }
 
+bool Webserver::JWTTokenAuth(const std::string& jwt_token,
+    struct sq_connection* connection, struct sq_request_info* request_info) {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(jwt_token, decoded_token);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error decoding JWT token in Authorization header, "
+               << "Error: " << status;
+    return false;
+  }
+  if (FLAGS_jwt_validate_signature) {
+    status = JWTHelper::GetInstance()->Verify(decoded_token.get());
+    if (!status.ok()) {
+      LOG(ERROR) << "Error verifying JWT token in Authorization header, "
+                 << "Error: " << status;
+      return false;
+    }
+  }
+
+  DCHECK(!FLAGS_jwt_custom_claim_username.empty());
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(
+      decoded_token.get(), FLAGS_jwt_custom_claim_username, username);
+  if (!status.ok()) {
+    LOG(ERROR) << "Cannot retrieve username from JWT token in Authorization header, "
+               << "Error: " << status;
+    return false;
+  }
+  request_info->remote_user = strdup(username.c_str());
+  return true;
+}
+
 Status Webserver::HandleBasic(struct sq_connection* connection,
     struct sq_request_info* request_info, vector<string>* response_headers) {
   const char* authz_header = sq_get_header(connection, "Authorization");
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index 96698c1..70438b2 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -201,6 +201,11 @@ class Webserver {
   bool TrustedDomainCheck(const std::string& origin, struct sq_connection* connection,
       struct sq_request_info* request_info);
 
+  /// Checks and returns true if the JWT token in Authorization header could be verified
+  /// and the token has a valid username.
+  bool JWTTokenAuth(const std::string& jwt_token, struct sq_connection* connection,
+      struct sq_request_info* request_info);
+
   // Handle Basic authentication for this request. Returns an error if authentication was
   // unsuccessful.
   Status HandleBasic(struct sq_connection* connection,
@@ -267,6 +272,10 @@ class Webserver {
   /// auth if it originates from a trusted domain.
   bool check_trusted_domain_;
 
+  /// If true, the JWT token in Authorization header will be used for authentication.
+  /// An incoming connection will be accepted if the JWT token could be verified.
+  bool use_jwt_ = false;
+
   /// Used to validate usernames/passwords If LDAP authentication is in use.
   std::unique_ptr<ImpalaLdap> ldap_;
 
@@ -288,6 +297,11 @@ class Webserver {
   /// If 'use_cookies_' is true, metrics for the number of successful
   /// attempts to authorize connections originating from a trusted domain.
   IntCounter* total_trusted_domain_check_success_ = nullptr;
+
+  /// If 'use_jwt_' is true, metrics for the number of successful and failed JWT auth
+  /// attempts.
+  IntCounter* total_jwt_token_auth_success_ = nullptr;
+  IntCounter* total_jwt_token_auth_failure_ = nullptr;
 };
 
 }
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 580d203..0a53b02 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -438,7 +438,7 @@ def get_toolchain_downloads():
   toolchain_packages += [llvm_package, llvm_package_asserts, gcc_package]
   toolchain_packages += map(ToolchainPackage,
       ["avro", "binutils", "boost", "breakpad", "bzip2", "cctz", "cmake", "crcutil",
-       "flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "libev",
+       "flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "jwt-cpp", "libev",
        "libunwind", "lz4", "openldap", "openssl", "orc", "protobuf", "python",
        "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib", "zstd"])
   # Check whether this platform is supported (or whether a valid custom toolchain
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d461582..08359a1 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -103,6 +103,8 @@ export IMPALA_GPERFTOOLS_VERSION=2.5-p4
 unset IMPALA_GPERFTOOLS_URL
 export IMPALA_GTEST_VERSION=1.6.0
 unset IMPALA_GTEST_URL
+export IMPALA_JWT_CPP_VERSION=0.5.0
+unset IMPALA_JWT_CPP_URL
 export IMPALA_LIBEV_VERSION=4.20
 unset IMPALA_LIBEV_URL
 export IMPALA_LIBUNWIND_VERSION=1.3-rc1-p3
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 366012e..83b5ad9 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -167,6 +167,7 @@ testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_default.expected.txt
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_extended.expected.txt
 testdata/hive_benchmark/grepTiny/part-00000
+testdata/jwt/jwks_rs256.json
 testdata/tzdb/2017c.zip
 testdata/tzdb/2017c-corrupt.zip
 testdata/tzdb_tiny/*
diff --git a/cmake_modules/FindJwtCpp.cmake b/cmake_modules/FindJwtCpp.cmake
new file mode 100644
index 0000000..c475edf
--- /dev/null
+++ b/cmake_modules/FindJwtCpp.cmake
@@ -0,0 +1,38 @@
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+# - Find jwt-cpp headers.
+# JWT_CPP_ROOT hints the location
+# This module defines JWT_CPP_INCLUDE_DIR, the directory containing headers
+
+set(JWT_CPP_SEARCH_HEADER_PATHS ${JWT_CPP_ROOT}/include)
+
+find_path(JWT_CPP_INCLUDE_DIR jwt-cpp/jwt.h HINTS
+  ${JWT_CPP_SEARCH_HEADER_PATHS})
+
+if (NOT JWT_CPP_INCLUDE_DIR)
+  message(FATAL_ERROR "jwt-cpp headers NOT found.")
+  set(JWT_CPP_FOUND FALSE)
+else()
+  set(JWT_CPP_FOUND TRUE)
+endif ()
+
+mark_as_advanced(
+  JWT_CPP_INCLUDE_DIR
+)
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 4c90674..842262d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -471,6 +471,10 @@ error_codes = (
 
   ("LOCAL_DISK_FAULTY", 152,
    "Query execution failure caused by local disk IO fatal error on backend: $0."),
+
+  ("JWKS_PARSE_ERROR", 153, "Error parsing JWKS: $0."),
+
+  ("JWT_VERIFY_FAILED", 154, "Error verifying JWT Token: $0."),
 )
 
 import sys
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b962145..c5f600e 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1392,6 +1392,26 @@
     "key": "impala.thrift-server.hiveserver2-http-frontend.total-saml-auth-failure"
   },
   {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that were successfully authenticated using JWT Token.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection JWT Token Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-jwt-token-auth-success"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that were attempted to authenticate using JWT Token but were unsuccessful.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection JWT Token Failure",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-jwt-token-auth-failure"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"
@@ -2823,7 +2843,7 @@
     "kind": "COUNTER",
     "key": "impala.webserver.total-cookie-auth-failure"
   },
-    {
+  {
     "description": "The number of HTTP connection requests to this daemon's webserver that originated from a trusted domain.",
     "contexts": [
       "IMPALAD",
@@ -2836,6 +2856,30 @@
     "key": "impala.webserver.total-trusted-domain-check-success"
   },
   {
+    "description": "The number of HTTP connection requests to this daemon's webserver that were successfully authenticated using a JWT token.",
+    "contexts": [
+      "IMPALAD",
+      "CATALOGSERVER",
+      "STATESTORE"
+    ],
+    "label": "Webserver HTTP Connection JWT Token Auth Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.webserver.total-jwt-token-auth-success"
+  },
+  {
+    "description": "The number of HTTP connection requests to this daemon's webserver that provided an invalid JWT token.",
+    "contexts": [
+      "IMPALAD",
+      "CATALOGSERVER",
+      "STATESTORE"
+    ],
+    "label": "Webserver HTTP Connection JWT Token Auth Failure",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.webserver.total-jwt-token-auth-failure"
+  },
+  {
     "description": "The number of times the FAIL debug action returned an error. For testing only.",
     "contexts": [
       "IMPALAD"
diff --git a/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java b/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java
new file mode 100644
index 0000000..931abba
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hive.service.rpc.thrift.*;
+import org.apache.impala.util.Metrics;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that hiveserver2 operations over the http interface work as expected when
+ * JWT authentication is being used.
+ */
+public class JwtHttpTest {
+  Metrics metrics = new Metrics();
+
+  public void setUp(String extraArgs) throws Exception {
+    int ret = CustomClusterRunner.StartImpalaCluster(extraArgs);
+    assertEquals(ret, 0);
+  }
+
+  static void verifySuccess(TStatus status) throws Exception {
+    if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+        || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+      return;
+    }
+    throw new Exception(status.toString());
+  }
+
+  /**
+   * Executes 'query' and fetches the results. Expects there to be exactly one string
+   * returned, which be be equal to 'expectedResult'.
+   */
+  static TOperationHandle execAndFetch(TCLIService.Iface client,
+      TSessionHandle sessionHandle, String query, String expectedResult)
+      throws Exception {
+    TExecuteStatementReq execReq = new TExecuteStatementReq(sessionHandle, query);
+    TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
+    verifySuccess(execResp.getStatus());
+
+    TFetchResultsReq fetchReq = new TFetchResultsReq(
+        execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+    TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+    verifySuccess(fetchResp.getStatus());
+    List<TColumn> columns = fetchResp.getResults().getColumns();
+    assertEquals(1, columns.size());
+    assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+
+    return execResp.getOperationHandle();
+  }
+
+  private void verifyJwtAuthMetrics(long expectedAuthSuccess, long expectedAuthFailure)
+      throws Exception {
+    long actualAuthSuccess =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-success");
+    assertEquals(expectedAuthSuccess, actualAuthSuccess);
+    long actualAuthFailure =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-failure");
+    assertEquals(expectedAuthFailure, actualAuthFailure);
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   * Since we don't have Java version of JWT library, we use pre-calculated JWT token
+   * and JWKS. The token and JWK set used in this test case were generated by using
+   * BE unit-test function JwtUtilTest::VerifyJwtRS256.
+   */
+  @Test
+  public void testJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+        "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+            + "--jwt_allow_without_tls=true",
+        jwksFilename));
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token and use it as
+    // login user.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyJwtAuthMetrics(3, 0);
+
+    // case 2: Authenticate fails with invalid JWT token which does not have signature.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    headers.put("Authorization", "Bearer " + invalidJwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // case 3: Authenticate fails without "Bearer" token.
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      // JWT authentication is not invoked.
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // case 4: Authenticate fails without "Authorization" header.
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      // JWT authentication is not invoked.
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testJwtAuthNotVerifySig() throws Exception {
+    // Start Impala without jwt_validate_signature as false so that the signature of
+    // JWT token will not be validated.
+    setUp("--jwt_token_auth=true --jwt_validate_signature=false "
+        + "--jwt_allow_without_tls=true");
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    // The Token was generated by BE unit-test function JwtUtilTest::VerifyJwtRS256.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyJwtAuthMetrics(3, 0);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java
new file mode 100644
index 0000000..5a798b4
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.apache.impala.testutil.LdapUtil.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Range;
+import org.apache.impala.util.Metrics;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests that Web Server works as expected when JWT authentication is being used.
+ */
+public class JwtWebserverTest {
+  private static final Range<Long> zero = Range.closed(0L, 0L);
+
+  Metrics metrics_ = new Metrics(TEST_USER_1, TEST_PASSWORD_1);
+
+  public void setUp(String extraArgs, String startArgs) throws Exception {
+    Map<String, String> env = new HashMap<>();
+    env.put("IMPALA_WEBSERVER_USERNAME", TEST_USER_1);
+    env.put("IMPALA_WEBSERVER_PASSWORD", TEST_PASSWORD_1);
+    int ret = CustomClusterRunner.StartImpalaCluster(extraArgs, env, startArgs);
+    assertEquals(ret, 0);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    metrics_.Close();
+  }
+
+  private void verifyJwtAuthMetrics(
+      Range<Long> expectedAuthSuccess, Range<Long> expectedAuthFailure) throws Exception {
+    long actualAuthSuccess =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-success");
+    assertTrue("Expected: " + expectedAuthSuccess + ", Actual: " + actualAuthSuccess,
+        expectedAuthSuccess.contains(actualAuthSuccess));
+    long actualAuthFailure =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-failure");
+    assertTrue("Expected: " + expectedAuthFailure + ", Actual: " + actualAuthFailure,
+        expectedAuthFailure.contains(actualAuthFailure));
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the Web Server.
+   * Since we don't have Java version of JWT library, we use pre-calculated JWT token
+   * and JWKS. The token and JWK set used in this test case were generated by using
+   * BE unit-test function JwtUtilTest::VerifyJwtRS256.
+   */
+  @Test
+  public void testWebserverJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+              "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+                  + "--jwt_allow_without_tls=true",
+              jwksFilename),
+        "");
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    attemptConnection("Bearer " + jwtToken, "127.0.0.1");
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), zero);
+
+    // Case 2: Failed with invalid JWT Token.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    try {
+      attemptConnection("Bearer " + invalidJwtToken, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+
+    // Case 3: Failed without "Bearer" token.
+    try {
+      attemptConnection("Basic VGVzdDFMZGFwOjEyMzQ1", "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    // JWT authentication is not invoked.
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+
+    // Case 4: Failed without "Authorization" header.
+    try {
+      attemptConnection(null, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    // JWT authentication is not invoked.
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+  }
+
+  // Helper method to make a "get" call to the Web Server using the input JWT auth token
+  // and x-forward-for address.
+  private void attemptConnection(String auth_token, String xff_address) throws Exception {
+    String url = "http://localhost:25000/?json";
+    URLConnection connection = new URL(url).openConnection();
+    if (auth_token != null) connection.setRequestProperty("Authorization", auth_token);
+    if (xff_address != null) {
+      connection.setRequestProperty("X-Forwarded-For", xff_address);
+    }
+    connection.getInputStream();
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 91950aa..8757b1e 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -123,6 +124,18 @@ public class LdapHS2Test {
     assertEquals(expectedAuthSuccess, actualAuthSuccess);
   }
 
+  private void verifyJwtAuthMetrics(long expectedAuthSuccess, long expectedAuthFailure)
+      throws Exception {
+    long actualAuthSuccess =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-success");
+    assertEquals(expectedAuthSuccess, actualAuthSuccess);
+    long actualAuthFailure =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-failure");
+    assertEquals(expectedAuthFailure, actualAuthFailure);
+  }
+
   /**
    * Tests LDAP authentication to the HTTP hiveserver2 endpoint.
    */
@@ -404,4 +417,66 @@ public class LdapHS2Test {
     verifyMetrics(4, 1);
     verifyTrustedDomainMetrics(6);
   }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testHiveserver2JwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+        "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+            + "--jwt_allow_without_tls=true",
+        jwksFilename));
+    verifyMetrics(0, 0);
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token and use it as
+    // login user.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyMetrics(0, 0);
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyMetrics(0, 0);
+    verifyJwtAuthMetrics(3, 0);
+
+    // case 2: Authenticate fails with invalid JWT token which does not have signature.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    headers.put("Authorization", "Bearer " + invalidJwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
index dc6fd89..6e68db6 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
@@ -105,6 +106,18 @@ public class LdapWebserverTest {
         expectedSuccess.contains(actualSuccess));
   }
 
+  private void verifyJwtAuthMetrics(
+      Range<Long> expectedAuthSuccess, Range<Long> expectedAuthFailure) throws Exception {
+    long actualAuthSuccess =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-success");
+    assertTrue("Expected: " + expectedAuthSuccess + ", Actual: " + actualAuthSuccess,
+        expectedAuthSuccess.contains(actualAuthSuccess));
+    long actualAuthFailure =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-failure");
+    assertTrue("Expected: " + expectedAuthFailure + ", Actual: " + actualAuthFailure,
+        expectedAuthFailure.contains(actualAuthFailure));
+  }
+
   @Test
   public void testWebserver() throws Exception {
     setUp("", "");
@@ -245,6 +258,45 @@ public class LdapWebserverTest {
     verifyTrustedDomainMetrics(Range.closed(successMetricBefore, successMetricBefore));
   }
 
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the Web Server.
+   */
+  @Test
+  public void testWebserverJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+              "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+                  + "--jwt_allow_without_tls=true",
+              jwksFilename),
+        "");
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    attemptConnection("Bearer " + jwtToken, "127.0.0.1");
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), zero);
+
+    // Case 2: Failed with invalid JWT Token.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    try {
+      attemptConnection("Bearer " + invalidJwtToken, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+  }
+
   // Helper method to make a get call to the webserver using the input basic
   // auth token and x-forward-for token.
   private void attemptConnection(String basic_auth_token, String xff_address)
diff --git a/testdata/jwt/jwks_rs256.json b/testdata/jwt/jwks_rs256.json
new file mode 100644
index 0000000..95483ba
--- /dev/null
+++ b/testdata/jwt/jwks_rs256.json
@@ -0,0 +1,7 @@
+
+{
+  "keys": [
+    { "use": "sig", "kty": "RSA", "kid": "public:c424b67b-fe28-45d7-b015-f79da50b5b21", "alg": "RS256", "n": "uGbXWiK3dQTyCbX5xdE4yCuYp0AF2d15Qq1JSXT_lx8CEcXb9RbDddl8jGDv-spi5qPa8qEHiK7FwV2KpRE983wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qea1bXASuvYXEpQNpGbnTGVsWXI9C-yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTFIF9Dm4SLHbphUS2iw7w1JgT69s7of9-I9l5lsJ9cozf1rxrXX4V1u_SotUuNB3Fp8oB4C1fLBEhSlMcUJirz1E8AziMCxS-VrRPDM-zfvpIJg3JljAh3PJHDiLu902v9w-Iplu1WyoB2aPfitxEhRN0Yw", "e": "AQAB" },
+    { "use": "sig", "kty": "RSA", "kid": "public:9b9d0b47-b9ed-4ba6-9180-52fc5b161a3a", "alg": "RS256", "n": "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vdi_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0iwzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKhCTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-XkiCcrkyS1cnghnllh-LCwQu1sYw", "e": "AQAB" }
+  ]
+}

[impala] 02/02: IMPALA-10738: Min/max filters should be enabled for partition columns

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 84d784351c7a3606ec86abf6ea757aef72687b55
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Wed Jun 9 13:52:45 2021 -0400

    IMPALA-10738: Min/max filters should be enabled for partition columns
    
    This patch enables min/max filters for partitoned columns to take
    advantage of the min/max filter infrastructure already built and to
    provide coverage for certain equi-joins in which the stats filters
    are not feasible.
    
    The new feature is turned on by default and to turn off the feature,
    set the new query option minmax_filter_partition_column to false.
    
    In the patch, the existing query option enabled_runtime_filter_types
    is enforced in specifying the types of the filters generated. The
    default value ALL generates both the bloom and min/max filters. The
    alternative value BLOOM generates only the bloom filters and another
    alternative value MIN_MAX generates only the min/max filters.
    
    The normal control knobs minmax_filter_threshold (for threshold) and
    minmax_filtering_level (for filtering level) still work. When the
    threshold is 0, the patch automatically assigns a reasonable value
    for the threshhold.
    
    Testing:
      1). Added new tests in
          overlap_min_max_filters_on_partition_columns.test;
      2). Core tests
    
    Change-Id: I89e135ef48b4bb36d70075287b03d1c12496b042
    Reviewed-on: http://gerrit.cloudera.org:8080/17568
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scan-node-base.cc                 |  3 +-
 be/src/exec/hdfs-scanner.h                         |  3 ++
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 12 +++++
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  4 ++
 be/src/runtime/runtime-filter.h                    |  5 ++
 be/src/service/query-options.cc                    |  7 ++-
 be/src/service/query-options.h                     |  6 ++-
 common/thrift/ImpalaService.thrift                 |  3 ++
 common/thrift/Query.thrift                         |  4 ++
 .../org/apache/impala/planner/HdfsScanNode.java    | 63 +++++++++++++++-------
 .../java/org/apache/impala/planner/Planner.java    |  5 ++
 .../impala/planner/RuntimeFilterGenerator.java     | 34 ++++++++----
 .../org/apache/impala/planner/PlannerTest.java     |  4 ++
 .../apache/impala/planner/TpcdsPlannerTest.java    |  2 +
 .../PlannerTest/bloom-filter-assignment.test       | 16 +++---
 ...-runtime-filters-hdfs-num-rows-est-enabled.test | 53 +++++++++---------
 .../PlannerTest/min-max-runtime-filters.test       | 29 +++++-----
 .../PlannerTest/runtime-filter-query-options.test  |  4 +-
 .../queries/QueryTest/overlap_min_max_filters.test | 14 ++---
 ...erlap_min_max_filters_on_partition_columns.test | 61 +++++++++++++++++++++
 .../queries/QueryTest/runtime_filters.test         |  1 +
 .../queries/QueryTest/runtime_filters_mt_dop.test  |  1 +
 tests/query_test/test_runtime_filters.py           | 12 ++++-
 23 files changed, 251 insertions(+), 95 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index acdf679..82bea13 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -919,8 +919,7 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id,
   if (template_tuple == nullptr) return true;
   TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple);
   for (const FilterContext& ctx: filter_ctxs) {
-    int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_);
-    if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) {
+    if (!ctx.filter->IsBoundByPartitionColumn(id_)) {
       continue;
     }
 
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index e3674d3..6402bbf 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -175,6 +175,9 @@ class HdfsScanner {
     return eos_;
   }
 
+  /// Return the plan id of the scan node that this scanner is associated with.
+  int GetScanNodeId() const { return scan_node_->id(); }
+
   /// Not inlined in IR so it can be replaced with a constant.
   int IR_NO_INLINE tuple_byte_size() const { return tuple_byte_size_; }
   int IR_NO_INLINE tuple_byte_size(const TupleDescriptor& desc) const {
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 43cc3a9..ff34eee 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -674,6 +674,11 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
     /// filter_ctxs_ and filter_stats_.
     int idx = FindFilterIndex(filter_id);
     DCHECK(idx >= 0);
+
+    if (IsBoundByPartitionColumn(idx)) {
+      continue;
+    }
+
     MinMaxFilter* minmax_filter = FindMinMaxFilter(idx);
 
     VLOG(3) << "Try to filter out a rowgroup via overlap predicate filter: "
@@ -1306,6 +1311,13 @@ MinMaxFilter* HdfsParquetScanner::FindMinMaxFilter(int filter_idx) {
   return nullptr;
 }
 
+bool HdfsParquetScanner::IsBoundByPartitionColumn(int filter_idx) {
+  DCHECK_LE(0, filter_idx);
+  DCHECK_LT(filter_idx,filter_ctxs_.size());
+  const RuntimeFilter* filter = filter_ctxs_[filter_idx]->filter;
+  return filter->IsBoundByPartitionColumn(GetScanNodeId());
+}
+
 bool HdfsParquetScanner::IsFilterWorthyForOverlapCheck(int filter_idx) {
   if (filter_idx >= 0 && filter_idx < filter_stats_.size()) {
     LocalFilterStats* stats = &filter_stats_[filter_idx];
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 5e99ffc..05a2552 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -614,6 +614,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Return nullptr if no min/max filter is found at that location.
   MinMaxFilter* FindMinMaxFilter(int filter_idx);
 
+  /// Return true when the filter at filter_ctx_[filter_idx] is bound by a
+  /// partition column and false otherwise.
+  bool IsBoundByPartitionColumn(int filter_idx);
+
   /// Return the memory addresses of the min and the max slot in min_max_tuple_ at
   /// location overlap_slot_idx and overlap_slot_idx+1.
   void GetMinMaxSlotsForOverlapPred(
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 8f494cf..68508ab 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -112,6 +112,11 @@ class RuntimeFilter {
   inline bool AlwaysTrue() const;
   inline bool AlwaysFalse() const;
 
+  bool IsBoundByPartitionColumn(int plan_id) const {
+    int target_ndx = filter_desc().planid_to_target_ndx.at(plan_id);
+    return filter_desc().targets[target_ndx].is_bound_by_partition_columns;
+  }
+
   /// Frequency with which to check for filter arrival in WaitForArrival()
   static const int SLEEP_PERIOD_MS;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 66a3286..5ba518a 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1093,8 +1093,8 @@ Status impala::SetQueryOption(const string& key, const string& value,
          break;
       }
       case TImpalaQueryOptions::MINMAX_FILTER_SORTED_COLUMNS: {
-         query_options->__set_minmax_filter_sorted_columns(IsTrue(value));
-         break;
+        query_options->__set_minmax_filter_sorted_columns(IsTrue(value));
+        break;
       }
       case TImpalaQueryOptions::MINMAX_FILTER_FAST_CODE_PATH: {
         TMinmaxFilterFastCodePathMode::type enum_type;
@@ -1107,6 +1107,9 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_enable_kudu_transaction(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS:
+        query_options->__set_minmax_filter_partition_columns(IsTrue(value));
+        break;
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 408f1c4..943e906 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_KUDU_TRANSACTION + 1);\
+      TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -255,7 +255,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(enable_kudu_transaction, ENABLE_KUDU_TRANSACTION,\
       TQueryOptionLevel::DEVELOPMENT)\
-;
+  QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,\
+      TQueryOptionLevel::ADVANCED)\
+  ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index f02e527..94109e0 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -681,6 +681,9 @@ enum TImpalaQueryOptions {
 
   // If true, Kudu's multi-row transaction is enabled.
   ENABLE_KUDU_TRANSACTION = 132
+
+  // Indicates whether to use min/max filtering on partition columns
+  MINMAX_FILTER_PARTITION_COLUMNS = 133
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 34198ed..d061578 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -523,7 +523,11 @@ struct TQueryOptions {
   132: optional TMinmaxFilterFastCodePathMode minmax_filter_fast_code_path =
       TMinmaxFilterFastCodePathMode.ON;
 
+  // See comment in ImpalaService.thrift
   133: optional bool enable_kudu_transaction = false;
+
+  // See comment in ImpalaService.thrift
+  134: optional bool minmax_filter_partition_columns = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0bfd9de..9ef52bb 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -658,7 +658,9 @@ public class HdfsScanNode extends ScanNode {
     minMaxTuple_.computeMemLayout();
   }
 
-  // Init the necessary data structures prior to the detection of overlap predicates.
+  /**
+   * Init the necessary data structures prior to the detection of overlap predicates.
+   */
   public void initOverlapPredicate(Analyzer analyzer) {
     if (!allParquet_) return;
     Preconditions.checkNotNull(minMaxTuple_);
@@ -668,7 +670,9 @@ public class HdfsScanNode extends ScanNode {
     overlap_first_slot_idx_ = minMaxTuple_.getSlots().size();
   }
 
-  // Data type check on the slot type and the join column type.
+  /**
+   * Data type check on the slot type and the join column type.
+   */
   private boolean checkTypeForOverlapPredicate(Type slotType, Type joinType) {
     // Both slotType and joinType must be Boolean at the same time.
     if (slotType.isBoolean() && joinType.isBoolean()) {
@@ -718,11 +722,33 @@ public class HdfsScanNode extends ScanNode {
     return false;
   }
 
+  /**
+   * Determine if a runtime filter should be allowed given the relevant query options.
+   */
+  private boolean allowRuntimeFilter(TQueryOptions queryOptions,
+      boolean isBoundByPartitionColumns, boolean isLeadingLexicalSortedColumn) {
+    boolean minmaxOnPartitionColumns = queryOptions.isMinmax_filter_partition_columns();
+    boolean minmaxOnSortedColumns = queryOptions.isMinmax_filter_sorted_columns();
+
+    // Allow min/max filters on partition columns only when enabled.
+    if (isBoundByPartitionColumns) {
+      return minmaxOnPartitionColumns;
+    }
+
+    // Allow min/max filters on sorted columns only when enabled.
+    if (isLeadingLexicalSortedColumn) {
+      return minmaxOnSortedColumns;
+    }
+
+    // Allow min/max filters if the threshold value > 0.0.
+    return queryOptions.getMinmax_filter_threshold() > 0.0;
+  }
+
   // Try to compute the overlap predicate for the filter. Return true if an overlap
   // predicate can be formed utilizing the min/max filter 'filter' against the
   // target expr 'targetExpr'. Return false otherwise.
-  public Boolean tryToComputeOverlapPredicate(
-      Analyzer analyzer, RuntimeFilter filter, Expr targetExpr) {
+  public Boolean tryToComputeOverlapPredicate(Analyzer analyzer, RuntimeFilter filter,
+      Expr targetExpr, boolean isBoundByPartitionColumns) {
     // This optimization is only valid for min/max filters and Parquet tables.
     if (filter.getType() != TRuntimeFilterType.MIN_MAX) return false;
     if (!allParquet_) return false;
@@ -740,24 +766,21 @@ public class HdfsScanNode extends ScanNode {
         return false;
     }
 
-    // Check if query option minmax_filter_sorted_columns is true and the
-    // target column is the leading sort-by column in lexical sort order.
-    // If so, allow the process to continue.
-    if (analyzer.getQueryOptions().isMinmax_filter_sorted_columns()) {
-      Column column = slotRefInScan.getDesc().getColumn();
-      if (column != null) {
-        TupleDescriptor tDesc = slotRefInScan.getDesc().getParent();
-        FeTable table = tDesc.getTable();
-        if (table != null && table instanceof FeFsTable) {
-          if (!((FeFsTable) table).isLeadingSortByColumn(column.getName())) {
-            return false;
-          }
-          if ((((FeFsTable) table).IsLexicalSortByColumn()) == false) {
-            return false;
-          }
-        }
+    boolean isLeadingLexicalSortedColumn = false;
+    Column column = slotRefInScan.getDesc().getColumn();
+    if (column != null) {
+      TupleDescriptor tDesc = slotRefInScan.getDesc().getParent();
+      FeTable table = tDesc.getTable();
+      if (table != null && table instanceof FeFsTable) {
+        isLeadingLexicalSortedColumn =
+            ((FeFsTable) table).isLeadingSortByColumn(column.getName())
+            && ((FeFsTable) table).IsLexicalSortByColumn();
       }
     }
+    if (!allowRuntimeFilter(analyzer.getQueryOptions(), isBoundByPartitionColumns,
+            isLeadingLexicalSortedColumn)) {
+      return false;
+    }
 
     Expr srcExpr = filter.getSrcExpr();
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 4d4919a..42205ef 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -686,6 +686,11 @@ public class Planner {
         queryOptions.setMinmax_filtering_level(TMinmaxFilteringLevel.PAGE);
       }
     }
+    if (queryOptions.isMinmax_filter_partition_columns()) {
+      if (queryOptions.getMinmax_filter_threshold() == 0.0) {
+        queryOptions.setMinmax_filter_threshold(0.5);
+      }
+    }
   }
 
   public static void checkForDisableCodegen(PlanNode distributedPlan,
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 840715f..6dce255 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -707,6 +707,19 @@ public final class RuntimeFilterGenerator {
   }
 
   /**
+   * Returns true if filter type 'filterType' is enabled in the context of the enabled
+   * runtime types 'enabledRuntimeFilterTypes'. Return false otherwise.
+   */
+  public boolean isRuntimeFilterTypeEnabled(TRuntimeFilterType filterType,
+      TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes) {
+    if (enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.ALL) return true;
+    return (filterType == TRuntimeFilterType.BLOOM
+            && enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.BLOOM
+        || filterType == TRuntimeFilterType.MIN_MAX
+            && enabledRuntimeFilterTypes == TEnabledRuntimeFilterTypes.MIN_MAX);
+  }
+
+  /**
    * Generates the runtime filters for a query by recursively traversing the distributed
    * plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
    * runtime filters are generated from equi-join predicates assigned to hash-join nodes.
@@ -728,7 +741,10 @@ public final class RuntimeFilterGenerator {
       joinConjuncts.addAll(joinNode.getConjuncts());
 
       List<RuntimeFilter> filters = new ArrayList<>();
+      TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
+          ctx.getQueryOptions().getEnabled_runtime_filter_types();
       for (TRuntimeFilterType filterType : TRuntimeFilterType.values()) {
+        if (!isRuntimeFilterTypeEnabled(filterType, enabledRuntimeFilterTypes)) continue;
         for (Expr conjunct : joinConjuncts) {
           RuntimeFilter filter =
               RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
@@ -842,8 +858,9 @@ public final class RuntimeFilterGenerator {
     boolean disableRowRuntimeFiltering =
         ctx.getQueryOptions().isDisable_row_runtime_filtering();
     boolean disable_overlap_filter =
-        !(ctx.getQueryOptions().isMinmax_filter_sorted_columns()
-            || ctx.getQueryOptions().getMinmax_filter_threshold() > 0.0);
+        !ctx.getQueryOptions().isMinmax_filter_sorted_columns()
+        && !ctx.getQueryOptions().isMinmax_filter_partition_columns()
+        && ctx.getQueryOptions().getMinmax_filter_threshold() == 0.0;
     TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
     TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
         ctx.getQueryOptions().getEnabled_runtime_filter_types();
@@ -875,16 +892,13 @@ public final class RuntimeFilterGenerator {
           if (!allow_min_max) {
             continue;
           }
-          // TODO: Apply min/max filters on partition columns.
-          if (isBoundByPartitionColumns) {
-            continue;
-          }
           if (!disable_overlap_filter) {
-            // If the filter is not defined on partition columns, try to compute
-            // an overlap predicate for it. This predicate will be used to filter
-            // out row groups or pages in Parquet data files.
+            // Try to compute an overlap predicate for the filter. This predicate will be
+            // used to filter out partitions, or row groups, pages or rows in Parquet data
+            // files.
             if (!((HdfsScanNode) scanNode)
-                .tryToComputeOverlapPredicate(analyzer, filter, targetExpr)) {
+                     .tryToComputeOverlapPredicate(
+                         analyzer, filter, targetExpr, isBoundByPartitionColumns)) {
               continue;
             }
           } else {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index ef5e2f6..22c7dd0 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -623,6 +623,9 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testBloomFilterAssignment() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setMinmax_filter_sorted_columns(false);
+    options.setMinmax_filter_partition_columns(false);
     runPlannerTestFile("bloom-filter-assignment",
         ImmutableSet.of(
             PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
@@ -903,6 +906,7 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setDisable_hdfs_num_rows_estimate(true);
+    options.setMinmax_filter_partition_columns(false);
     options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.MIN_MAX);
     runPlannerTestFile("min-max-runtime-filters", options);
   }
diff --git a/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java b/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
index 356b203..14b9d3d 100644
--- a/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TpcdsPlannerTest.java
@@ -49,6 +49,8 @@ public class TpcdsPlannerTest extends PlannerTestBase {
     options.setMinmax_filter_threshold(0.5);
     /* Disable minmax filter on sorted columns. */
     options.setMinmax_filter_sorted_columns(false);
+    /* Disable minmax filter on partition columns. */
+    options.setMinmax_filter_partition_columns(false);
   }
 
   @Test
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
index cc53eca..9aa82ec 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -313,7 +313,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = b.int_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF002[bloom] <- b.int_col
+|  runtime filters: RF001[bloom] <- b.int_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=8B cardinality=12.79K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -326,7 +326,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes a]
    HDFS partitions=24/24 files=24 size=201.11KB
-   runtime filters: RF000[bloom] -> a.int_col, RF002[bloom] -> a.int_col
+   runtime filters: RF000[bloom] -> a.int_col, RF001[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=12.84K
@@ -384,7 +384,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = b.int_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF002[bloom] <- b.int_col
+|  runtime filters: RF001[bloom] <- b.int_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=8B cardinality=12.79K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -396,7 +396,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes a]
    HDFS partitions=24/24 files=24 size=201.11KB
-   runtime filters: RF000[bloom] -> a.int_col + 1, RF002[bloom] -> a.int_col
+   runtime filters: RF000[bloom] -> a.int_col + 1, RF001[bloom] -> a.int_col
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=12.84K
@@ -454,7 +454,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.timestamp_col = b.timestamp_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF003[bloom] <- b.timestamp_col
+|  runtime filters: RF002[bloom] <- b.timestamp_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=32B cardinality=12.88K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -467,7 +467,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes a]
    HDFS partitions=24/24 files=24 size=202.50KB
-   runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
+   runtime filters: RF000[bloom] -> a.timestamp_col, RF002[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=12.84K
@@ -517,7 +517,7 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.timestamp_col = b.timestamp_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF003[bloom] <- b.timestamp_col
+|  runtime filters: RF002[bloom] <- b.timestamp_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=32B cardinality=12.88K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -530,7 +530,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes a]
    HDFS partitions=24/24 files=24 size=202.50KB
-   runtime filters: RF000[bloom] -> a.timestamp_col, RF003[bloom] -> a.timestamp_col
+   runtime filters: RF000[bloom] -> a.timestamp_col, RF002[bloom] -> a.timestamp_col
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=12.84K
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
index 37fe50b..e5aa227 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
@@ -3,29 +3,29 @@
 # The following are the hdfs tables without available statistics:
 # functional_parquet.alltypes
 # Query with both Kudu and HDFS filter targets.
-select count(*) from functional_kudu.alltypes a, functional_parquet.alltypes b,
+select straight_join count(*) from functional_kudu.alltypes a, functional_parquet.alltypes b,
     functional_kudu.alltypes c
 where a.int_col = b.int_col and a.int_col = c.int_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=23.38MB mem-reservation=5.89MB thread-reservation=4 runtime-filters-memory=2.00MB
+|  Per-Host Resources: mem-estimate=21.38MB mem-reservation=4.00MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
-|  mem-estimate=100.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=8B cardinality=1
-|  in pipelines: 05(GETNEXT), 01(OPEN)
+|  in pipelines: 05(GETNEXT), 00(OPEN)
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col
 |  fk/pk conjuncts: none
-|  runtime filters: RF000[bloom] <- c.int_col, RF001[min_max] <- c.int_col
+|  runtime filters: RF000[min_max] <- c.int_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
-|  tuple-ids=1,0,2 row-size=12B cardinality=9.30M
-|  in pipelines: 01(GETNEXT), 02(OPEN)
+|  tuple-ids=0,1,2 row-size=12B cardinality=5.33M
+|  in pipelines: 00(GETNEXT), 02(OPEN)
 |
 |--02:SCAN KUDU [functional_kudu.alltypes c]
 |     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
@@ -33,28 +33,27 @@ PLAN-ROOT SINK
 |     in pipelines: 02(GETNEXT)
 |
 03:HASH JOIN [INNER JOIN]
-|  hash predicates: b.int_col = a.int_col
+|  hash predicates: a.int_col = b.int_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF002[bloom] <- a.int_col
+|  runtime filters: RF001[min_max] <- b.int_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
-|  tuple-ids=1,0 row-size=8B cardinality=12.74K
-|  in pipelines: 01(GETNEXT), 00(OPEN)
+|  tuple-ids=0,1 row-size=8B cardinality=7.30K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
-|--00:SCAN KUDU [functional_kudu.alltypes a]
-|     runtime filters: RF001[min_max] -> a.int_col
-|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
-|     tuple-ids=0 row-size=4B cardinality=7.30K
-|     in pipelines: 00(GETNEXT)
+|--01:SCAN HDFS [functional_parquet.alltypes b]
+|     HDFS partitions=24/24 files=24 size=201.59KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=12.82K
+|       columns: unavailable
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
+|     tuple-ids=1 row-size=4B cardinality=12.82K
+|     in pipelines: 01(GETNEXT)
 |
-01:SCAN HDFS [functional_parquet.alltypes b]
-   HDFS partitions=24/24 files=24 size=200.33KB
-   runtime filters: RF000[bloom] -> b.int_col, RF002[bloom] -> b.int_col
-   stored statistics:
-     table: rows=unavailable size=unavailable
-     partitions: 0/24 rows=12.84K
-     columns: unavailable
-   extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
-   tuple-ids=1 row-size=4B cardinality=12.74K
-   in pipelines: 01(GETNEXT)
+00:SCAN KUDU [functional_kudu.alltypes a]
+   runtime filters: RF000[min_max] -> a.int_col, RF001[min_max] -> a.int_col
+   mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+   tuple-ids=0 row-size=4B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
index f436ff7..bb9f5ec 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
@@ -17,7 +17,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.string_col = b.string_col, a.int_col = b.tinyint_col + 1
 |  fk/pk conjuncts: none
-|  runtime filters: RF002[min_max] <- b.string_col, RF003[min_max] <- b.tinyint_col + 1
+|  runtime filters: RF000[min_max] <- b.string_col, RF001[min_max] <- b.tinyint_col + 1
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=39B cardinality=5.84K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -28,7 +28,7 @@ PLAN-ROOT SINK
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
-   runtime filters: RF002[min_max] -> a.string_col, RF003[min_max] -> a.int_col
+   runtime filters: RF000[min_max] -> a.string_col, RF001[min_max] -> a.int_col
    mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=21B cardinality=7.30K
    in pipelines: 00(GETNEXT)
@@ -92,7 +92,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: CAST(a.float_col AS DOUBLE) = b.double_col, CAST(a.int_col AS SMALLINT) = b.smallint_col, a.tinyint_col = b.bigint_col, a.string_col = b.timestamp_col
 |  fk/pk conjuncts: a.tinyint_col = b.bigint_col, a.string_col = b.timestamp_col
-|  runtime filters: RF007[min_max] <- b.bigint_col
+|  runtime filters: RF002[min_max] <- b.bigint_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=60B cardinality=1.46K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -103,7 +103,7 @@ PLAN-ROOT SINK
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
-   runtime filters: RF007[min_max] -> a.tinyint_col
+   runtime filters: RF002[min_max] -> a.tinyint_col
    mem-estimate=3.00MB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=26B cardinality=7.30K
    in pipelines: 00(GETNEXT)
@@ -114,7 +114,7 @@ select count(*) from functional_kudu.alltypes a, functional_parquet.alltypes b,
 where a.int_col = b.int_col and a.int_col = c.int_col
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=2.02GB mem-reservation=36.95MB thread-reservation=4 runtime-filters-memory=1.00MB
+|  Per-Host Resources: mem-estimate=2.02GB mem-reservation=36.95MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
@@ -128,7 +128,7 @@ PLAN-ROOT SINK
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = c.int_col
 |  fk/pk conjuncts: none
-|  runtime filters: RF000[bloom] <- c.int_col, RF001[min_max] <- c.int_col
+|  runtime filters: RF000[min_max] <- c.int_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1,2 row-size=12B cardinality=5.33M
 |  in pipelines: 00(GETNEXT), 02(OPEN)
@@ -141,14 +141,13 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.int_col = b.int_col
 |  fk/pk conjuncts: assumed fk/pk
-|  runtime filters: RF003[min_max] <- b.int_col
+|  runtime filters: RF001[min_max] <- b.int_col
 |  mem-estimate=2.00GB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=8B cardinality=7.30K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN HDFS [functional_parquet.alltypes b]
 |     HDFS partitions=24/24 files=24 size=202.50KB
-|     runtime filters: RF000[bloom] -> b.int_col
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
 |       partitions: 0/24 rows=unavailable
@@ -159,7 +158,7 @@ PLAN-ROOT SINK
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
-   runtime filters: RF001[min_max] -> a.int_col, RF003[min_max] -> a.int_col
+   runtime filters: RF000[min_max] -> a.int_col, RF001[min_max] -> a.int_col
    mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=7.30K
    in pipelines: 00(GETNEXT)
@@ -174,18 +173,18 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=6.88MB mem-reservation=3.88MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
-|  mem-estimate=100.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=8B cardinality=1
 |  in pipelines: 05(GETNEXT), 00(OPEN)
 |
 04:HASH JOIN [INNER JOIN]
 |  hash predicates: b.id_col = c.id_col
 |  fk/pk conjuncts: b.id_col = c.id_col
-|  runtime filters: RF001[min_max] <- c.id_col
+|  runtime filters: RF000[min_max] <- c.id_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1,2 row-size=16B cardinality=30
 |  in pipelines: 00(GETNEXT), 02(OPEN)
@@ -198,19 +197,19 @@ PLAN-ROOT SINK
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: a.date_col = b.date_col
 |  fk/pk conjuncts: none
-|  runtime filters: RF003[min_max] <- b.date_col
+|  runtime filters: RF001[min_max] <- b.date_col
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=12B cardinality=30
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN KUDU [functional_kudu.date_tbl b]
-|     runtime filters: RF001[min_max] -> b.id_col
+|     runtime filters: RF000[min_max] -> b.id_col
 |     mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=1 row-size=8B cardinality=22
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.date_tbl a]
-   runtime filters: RF003[min_max] -> a.date_col
+   runtime filters: RF001[min_max] -> a.date_col
    mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=22
    in pipelines: 00(GETNEXT)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
index 0d25f65..30ff98c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -726,7 +726,7 @@ PLAN-ROOT SINK
 02:HASH JOIN [INNER JOIN]
 |  hash predicates: a.id = b.id
 |  fk/pk conjuncts: a.id = b.id
-|  runtime filters: RF001[min_max] <- b.id
+|  runtime filters: RF000[min_max] <- b.id
 |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
 |  tuple-ids=0,1 row-size=8B cardinality=7.30K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
@@ -737,7 +737,7 @@ PLAN-ROOT SINK
 |     in pipelines: 01(GETNEXT)
 |
 00:SCAN KUDU [functional_kudu.alltypes a]
-   runtime filters: RF001[min_max] -> a.id
+   runtime filters: RF000[min_max] -> a.id
    mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
    tuple-ids=0 row-size=4B cardinality=7.30K
    in pipelines: 00(GETNEXT)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
index 770133a..a3ae181 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
@@ -140,11 +140,12 @@ aggregation(SUM, NumRuntimeFilteredPages): 120
 ---- QUERY
 ###################################################
 # Repeat the above test with the overlap filtering
-# disabled. Should return the same number of rows.
-# Expect to see 0 pages filtered out.
+# on sorted columns disabled. Should return the
+# same number of rows. Expect to see 0 pages
+# filtered out.
 ###################################################
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
-SET MINMAX_FILTER_THRESHOLD=0.0;
+SET MINMAX_FILTER_SORTED_COLUMNS=false;
 select straight_join count(*)
 from lineitem_sorted_l_shipdate a join [SHUFFLE]
 tpch_parquet.orders b
@@ -185,11 +186,12 @@ aggregation(SUM, NumRuntimeFilteredPages): 255
 ---- QUERY
 ###################################################
 # Join the above sorted column with itself with
-# overlap filtering disabled. Expect to see the same
-# result with 0 pages filtered out.
+# overlap filtering on sorted columns disabled.
+# Expect to see the same result with 0 pages
+# filtered out.
 ###################################################
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
-SET MINMAX_FILTER_THRESHOLD=0.0;
+SET MINMAX_FILTER_SORTED_COLUMNS=false;
 select straight_join count(*)
 from lineitem_sorted_l_extendedprice a join [SHUFFLE]
 lineitem_sorted_l_extendedprice b
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_partition_columns.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_partition_columns.test
new file mode 100644
index 0000000..0c63a40
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters_on_partition_columns.test
@@ -0,0 +1,61 @@
+====
+---- QUERY
+# Run an equi hash join query with the first partitioned column a.year on
+# the fact table side. Only the min/max filters are allowed. Expect to see
+# all files # are rejected by the min/max filter.
+set runtime_filter_wait_time_ms=$RUNTIME_FILTER_WAIT_TIME_MS;
+set enabled_runtime_filter_types=MIN_MAX;
+select straight_join count(*) from
+functional_parquet.alltypes a join [SHUFFLE] functional_parquet.alltypes b
+where a.year = b.int_col;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, Files processed): 24
+aggregation(SUM, Files rejected): 24
+====
+---- QUERY
+# Run an equi hash join query with the 2nd partitioned column a.month on
+# the fact table side. Only the min/max filters are allowed. Expect to see
+# all files are rejected by the min/max filter.
+set runtime_filter_wait_time_ms=$RUNTIME_FILTER_WAIT_TIME_MS;
+set enabled_runtime_filter_types=MIN_MAX;
+select straight_join count(*) from
+functional_parquet.alltypes a join [SHUFFLE] functional_parquet.dimtbl b
+where a.month = b.zip;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, Files processed): 24
+aggregation(SUM, Files rejected): 24
+====
+---- QUERY
+# Explain a hash join query with min/max filter on partition columns turned off. Expect
+# one bloom filter only.
+set runtime_filter_wait_time_ms=$RUNTIME_FILTER_WAIT_TIME_MS;
+set minmax_filter_partition_columns=false;
+set explain_level=2;
+explain
+select straight_join count(*) from
+functional_parquet.alltypes a join [SHUFFLE] functional_parquet.alltypes b
+where a.year = b.int_col;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:|   runtime filters: RF000[bloom] -> a.`year`
+====
+---- QUERY
+# Setup a hash join query with min/max filter on the partition column 'month' only.
+# Expect 6 files rejected by the filter, as there are a total of 24 months worth of
+# data is stored in 'alltypes' and there are two data files per month. The range in
+# the filter built from b.int_col is [0, 9].
+set runtime_filter_wait_time_ms=$RUNTIME_FILTER_WAIT_TIME_MS;
+set enabled_runtime_filter_types=MIN_MAX;
+set minmax_filter_threshold=0.9;
+select straight_join count(*) from
+functional_parquet.alltypes a join [SHUFFLE] functional_parquet.alltypes b
+where a.month= b.int_col;
+---- RESULTS
+3985800
+---- RUNTIME_PROFILE
+aggregation(SUM, Files processed): 24
+aggregation(SUM, Files rejected): 6
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index b8bbba9..13057c0 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -187,6 +187,7 @@ row_regex: .*FiltersReceived: 0 .*
 # be created, only min-max filter will be created.
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET MINMAX_FILTER_PARTITION_COLUMNS=FALSE;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [SHUFFLE] alltypestiny b
     on a.month = b.month + 10000;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
index 8bd1d53..b6fab63 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
@@ -186,6 +186,7 @@ row_regex: .*FiltersReceived: 0 .*
 # from the three finstances are aggregated into a single one).
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MODE=GLOBAL;
+SET MINMAX_FILTER_PARTITION_COLUMNS=FALSE;
 select STRAIGHT_JOIN count(*) from alltypes a
     join [SHUFFLE] alltypestiny b
     on a.month = b.month + 10000;
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 006e766..593ca1e 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -286,15 +286,23 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
 
   def test_overlap_min_max_filters(self, vector, unique_database):
     self.execute_query("SET MINMAX_FILTER_THRESHOLD=0.5")
-    self.execute_query("SET MINMAX_FILTER_SORTED_COLUMNS=false")
+    # disable min/max filters on partition columns and allow min/max filters
+    # on sorted columns (by default).
+    self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.run_test_case('QueryTest/overlap_min_max_filters', vector, unique_database,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+    self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=true")
 
   def test_overlap_min_max_filters_on_sorted_columns(self, vector, unique_database):
     self.run_test_case('QueryTest/overlap_min_max_filters_on_sorted_columns', vector,
                        unique_database,
         test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
+  def test_overlap_min_max_filters_on_partition_columns(self, vector, unique_database):
+    self.run_test_case('QueryTest/overlap_min_max_filters_on_partition_columns', vector,
+                       unique_database,
+        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+
 # Apply both Bloom filter and Minmax filters
 class TestAllRuntimeFilters(ImpalaTestSuite):
   @classmethod
@@ -315,6 +323,7 @@ class TestAllRuntimeFilters(ImpalaTestSuite):
     self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=ALL")
     # Disable generating min/max filters for sorted columns
     self.execute_query("SET MINMAX_FILTER_SORTED_COLUMNS=false")
+    self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.run_test_case('QueryTest/all_runtime_filters', vector,
                        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
 
@@ -347,6 +356,7 @@ class TestRuntimeRowFilters(ImpalaTestSuite):
     new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     # Disable generating min/max filters for sorted columns
     self.execute_query("SET MINMAX_FILTER_SORTED_COLUMNS=false")
+    self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.run_test_case('QueryTest/runtime_row_filters', new_vector,
                        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})