You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/07/11 12:58:11 UTC

[impala] 01/02: IMPALA-10489: Implement JWT support

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 025500ccb5fed088e58da7bb7a8021088a9bba98
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Thu May 6 22:34:25 2021 -0700

    IMPALA-10489: Implement JWT support
    
    This patch added JWT support with following functionality:
     * Load and parse JWKS from pre-installed JSON file.
     * Read the JWT token from the HTTP Header.
     * Verify the JWT's signature with public key in JWKS.
     * Get the username out of the payload of JWT token.
     * Support following JSON Web Algorithms (JWA):
       HS256, HS384, HS512, RS256, RS384, RS512.
    
    We use third party library jwt-cpp to verify JWT token. jwt-cpp is a
    headers only C++ library. It was added to native-toolchain.
    This patch modified bootstrap_toolchain.py to download jwt-cpp from
    toolchain s3 bucket, and modified makefiles to add jwt-cpp/include
    in the include path.
    
    Added BE unit-tests for loading JWKS file and verifying JWT token.
    Also added FE custom cluster test for JWT authentication.
    
    Testing:
     - Passed core run.
    
    Change-Id: I6b71fa854c9ddc8ca882878853395e1eb866143c
    Reviewed-on: http://gerrit.cloudera.org:8080/17435
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                                     |   6 +
 be/CMakeLists.txt                                  |   1 +
 be/src/rpc/auth-provider.h                         |   6 +-
 be/src/rpc/authentication.cc                       | 103 ++-
 be/src/service/impala-server.cc                    |  17 +
 be/src/transport/THttpServer.cpp                   |  35 +-
 be/src/transport/THttpServer.h                     |  30 +-
 be/src/util/CMakeLists.txt                         |   3 +
 be/src/util/jwt-util-internal.h                    | 224 +++++++
 be/src/util/jwt-util-test.cc                       | 702 +++++++++++++++++++++
 be/src/util/jwt-util.cc                            | 595 +++++++++++++++++
 be/src/util/jwt-util.h                             |  91 +++
 be/src/util/webserver.cc                           |  78 ++-
 be/src/util/webserver.h                            |  14 +
 bin/bootstrap_toolchain.py                         |   2 +-
 bin/impala-config.sh                               |   2 +
 bin/rat_exclude_files.txt                          |   1 +
 cmake_modules/FindJwtCpp.cmake                     |  38 ++
 common/thrift/generate_error_codes.py              |   4 +
 common/thrift/metrics.json                         |  46 +-
 .../apache/impala/customcluster/JwtHttpTest.java   | 217 +++++++
 .../impala/customcluster/JwtWebserverTest.java     | 141 +++++
 .../apache/impala/customcluster/LdapHS2Test.java   |  75 +++
 .../impala/customcluster/LdapWebserverTest.java    |  52 ++
 testdata/jwt/jwks_rs256.json                       |   7 +
 25 files changed, 2464 insertions(+), 26 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 18ce4e7..de769c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -102,6 +102,7 @@ set_dep_root(GFLAGS)
 set_dep_root(GLOG)
 set_dep_root(GPERFTOOLS)
 set_dep_root(GTEST)
+set_dep_root(JWT_CPP)
 set_dep_root(LIBEV)
 set_dep_root(LIBUNWIND)
 set_dep_root(LLVM)
@@ -289,6 +290,11 @@ IMPALA_ADD_THIRDPARTY_LIB(zstd ${ZSTD_INCLUDE_DIR} ${ZSTD_STATIC_LIB} "")
 find_package(Re2 REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(re2 ${RE2_INCLUDE_DIR} ${RE2_STATIC_LIB} "")
 
+# find jwt-cpp headers
+find_package(JwtCpp REQUIRED)
+include_directories(${JWT_CPP_INCLUDE_DIR})
+message(STATUS "jwt-cpp include dir: " ${JWT_CPP_INCLUDE_DIR})
+
 # find rapidjson headers
 find_package(RapidJson REQUIRED)
 include_directories(${RAPIDJSON_INCLUDE_DIR})
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 638f0a7..fd1a40c 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -338,6 +338,7 @@ set(CLANG_INCLUDE_FLAGS
   "-I${GLOG_INCLUDE_DIR}"
   "-I${GFLAGS_INCLUDE_DIR}"
   "-I${GTEST_INCLUDE_DIR}"
+  "-I${JWT_CPP_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
   "-I${ORC_INCLUDE_DIR}"
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 84a6f23..30986ac 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -83,7 +83,7 @@ class AuthProvider {
 class SecureAuthProvider : public AuthProvider {
  public:
   SecureAuthProvider(bool is_internal)
-    : has_ldap_(false), has_saml_(false), is_internal_(is_internal) {}
+    : has_ldap_(false), has_saml_(false), has_jwt_(false), is_internal_(is_internal) {}
 
   /// Performs initialization of external state.
   /// If we're using ldap, set up appropriate certificate usage.
@@ -131,6 +131,8 @@ class SecureAuthProvider : public AuthProvider {
 
   void InitSaml() { has_saml_ = true; }
 
+  void InitJwt() { has_jwt_ = true; }
+
   /// Used for testing
   const std::string& principal() const { return principal_; }
   const std::string& service_name() const { return service_name_; }
@@ -144,6 +146,8 @@ class SecureAuthProvider : public AuthProvider {
 
   bool has_saml_;
 
+  bool has_jwt_;
+
   /// Hostname of this machine - if kerberos, derived from principal.  If there
   /// is no kerberos, but LDAP is used, then acquired via GetHostname().
   std::string hostname_;
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index a482145..b2071a1 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -57,6 +57,7 @@
 #include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
+#include "util/jwt-util.h"
 #include "util/ldap-util.h"
 #include "util/network-util.h"
 #include "util/os-util.h"
@@ -137,6 +138,31 @@ DEFINE_bool_hidden(saml2_allow_without_tls_debug_only, false,
 
 DECLARE_string(saml2_sp_callback_url);
 
+// If set, Impala support for trusting an authentication based on JWT token in the HTTP
+// header.
+DEFINE_bool(jwt_token_auth, false,
+    "When true, read the JWT token out of the HTTP Header and extract user name from "
+    "the token payload.");
+// The last segment of a JWT is the signature, which is used to verify that the token was
+// signed by the sender and not altered in any way. By default, it's required to validate
+// the signature of the JWT tokens. Otherwise it may expose security issue.
+DEFINE_bool(jwt_validate_signature, true,
+    "When true, validate the signature of JWT token with pre-installed JWKS.");
+// JWKS consists the public keys used by the signing party to the clients that need to
+// validate signatures. It represents cryptographic keys in JSON data structure.
+DEFINE_string(jwks_file_path, "",
+    "File path of the pre-installed JSON Web Key Set (JWKS) for JWT verification");
+// This specifies the custom claim in the JWT that contains the "username" for the
+// session.
+DEFINE_string(jwt_custom_claim_username, "username", "Custom claim 'username'");
+// If set, Impala allows JWT authentication on unsecure channel.
+// JWT is only secure when used with TLS. But in some deployment scenarios, TLS is handled
+// by proxy so that it does not show up as TLS to Impala.
+DEFINE_bool_hidden(jwt_allow_without_tls, false,
+    "When this configuration is set to true, Impala allows JWT authentication on "
+    "unsecure channel. This should be only enabled for testing, or development for which "
+    "TLS is handled by proxy.");
+
 namespace impala {
 
 // Sasl callbacks.  Why are these here?  Well, Sasl isn't that bright, and
@@ -525,6 +551,41 @@ bool BasicAuth(ThriftServer::ConnectionContext* connection_context,
   return false;
 }
 
+bool JWTTokenAuth(ThriftServer::ConnectionContext* connection_context,
+    const AuthenticationHash& hash, const string& token) {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(token, decoded_token);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error decoding JWT token received from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
+    return false;
+  }
+  if (FLAGS_jwt_validate_signature) {
+    status = JWTHelper::GetInstance()->Verify(decoded_token.get());
+    if (!status.ok()) {
+      LOG(ERROR) << "Error verifying JWT token received from: "
+                 << TNetworkAddressToString(connection_context->network_address)
+                 << " Error: " << status;
+      return false;
+    }
+  }
+
+  DCHECK(!FLAGS_jwt_custom_claim_username.empty());
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(
+      decoded_token.get(), FLAGS_jwt_custom_claim_username, username);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error extracting username from JWT token received from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
+    return false;
+  }
+  connection_context->username = username;
+  // TODO: cookies are not added, but are not needed right now
+  return true;
+}
+
 // Performs a step of SPNEGO auth for the HTTP transport and sets the username on
 // 'connection_context' if auth is successful. 'header_token' is the value from an
 // 'Authorization: Negotiate" header. Returns true if the step was successful and sets
@@ -1060,14 +1121,14 @@ Status SecureAuthProvider::Start() {
 Status SecureAuthProvider::GetServerTransportFactory(
     ThriftServer::TransportType underlying_transport_type, const std::string& server_name,
     MetricGroup* metrics, std::shared_ptr<TTransportFactory>* factory) {
-  DCHECK(!principal_.empty() || has_ldap_ || has_saml_);
+  DCHECK(!principal_.empty() || has_ldap_ || has_saml_ || has_jwt_);
 
   if (underlying_transport_type == ThriftServer::HTTP) {
     bool has_kerberos = !principal_.empty();
     bool use_cookies = FLAGS_max_cookie_lifetime_s > 0;
     bool check_trusted_domain = !FLAGS_trusted_domain.empty();
     factory->reset(new THttpServerTransportFactory(server_name, metrics, has_ldap_,
-        has_kerberos, use_cookies, check_trusted_domain, has_saml_));
+        has_kerberos, use_cookies, check_trusted_domain, has_saml_, has_jwt_));
     return Status::OK();
   }
 
@@ -1191,6 +1252,10 @@ void SecureAuthProvider::SetupConnectionContext(
         callbacks.validate_saml2_bearer_fn =
             std::bind(ValidateSaml2Bearer, connection_ptr.get(), hash_);
       }
+      if (has_jwt_) {
+        callbacks.jwt_token_auth_fn =
+            std::bind(JWTTokenAuth, connection_ptr.get(), hash_, std::placeholders::_1);
+      }
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
       socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
@@ -1286,6 +1351,20 @@ Status AuthManager::Init() {
     }
   }
 
+  bool use_jwt = FLAGS_jwt_token_auth;
+  if (use_jwt) {
+    if (!IsExternalTlsConfigured()) {
+      if (!FLAGS_jwt_allow_without_tls) {
+        return Status("JWT authentication should be only used with TLS enabled.");
+      }
+      LOG(WARNING) << "JWT authentication is used without TLS.";
+    }
+    if (FLAGS_jwt_custom_claim_username.empty()) {
+      return Status(
+          "JWT authentication requires jwt_custom_claim_username to be specified.");
+    }
+  }
+
   // Get all of the flag validation out of the way
   if (FLAGS_enable_ldap_auth) {
     RETURN_IF_ERROR(
@@ -1338,7 +1417,7 @@ Status AuthManager::Init() {
 
   if (IsInternalKerberosEnabled()) {
     // Initialize the auth provider first, in case validation of the principal fails.
-    SecureAuthProvider* sap = NULL;
+    SecureAuthProvider* sap = nullptr;
     internal_auth_provider_.reset(sap = new SecureAuthProvider(true));
     RETURN_IF_ERROR(sap->InitKerberos(kerberos_internal_principal));
     LOG(INFO) << "Internal communication is authenticated with Kerberos";
@@ -1352,7 +1431,7 @@ Status AuthManager::Init() {
   // principal or ldap tells us to use a SecureAuthProvider, and we fill in
   // details from there.
   if (FLAGS_enable_ldap_auth || external_kerberos_enabled) {
-    SecureAuthProvider* sap = NULL;
+    SecureAuthProvider* sap = nullptr;
     external_auth_provider_.reset(sap = new SecureAuthProvider(false));
     if (external_kerberos_enabled) {
       RETURN_IF_ERROR(sap->InitKerberos(kerberos_external_principal));
@@ -1366,15 +1445,25 @@ Status AuthManager::Init() {
       LOG(INFO) << "External communication can be also authenticated with SAML2 SSO";
       sap->InitSaml();
     }
+    if (use_jwt) {
+      LOG(INFO) << "External communication can be also authenticated with JWT";
+      sap->InitJwt();
+    }
   } else {
     external_auth_provider_.reset(new NoAuthProvider());
     LOG(INFO) << "External communication is not authenticated for binary protocols";
     if (use_saml) {
-      SecureAuthProvider* sap = NULL;
+      SecureAuthProvider* sap = nullptr;
       external_http_auth_provider_.reset(sap = new SecureAuthProvider(false));
       sap->InitSaml();
-      LOG(INFO) <<
-          "External communication is authenticated for hs2-http protocol with SAML2 SSO";
+      LOG(INFO) << "External communication is authenticated for hs2-http protocol with "
+                   "SAML2 SSO";
+    } else if (use_jwt) {
+      SecureAuthProvider* sap = nullptr;
+      external_http_auth_provider_.reset(sap = new SecureAuthProvider(false));
+      sap->InitJwt();
+      LOG(INFO)
+          << "External communication is authenticated for hs2-http protocol with JWT";
     } else {
       LOG(INFO) << "External communication is not authenticated for hs2-http protocol";
     }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a48f7fe..355b835 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -87,6 +87,7 @@
 #include "util/error-util.h"
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
+#include "util/jwt-util.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
@@ -343,6 +344,11 @@ DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
     "admission service, if enabled. Heartbeats are used to ensure resources are properly "
     "accounted for even if rpcs to the admission service occasionally fail.");
 
+// Flags for JWT token based authentication.
+DECLARE_bool(jwt_token_auth);
+DECLARE_bool(jwt_validate_signature);
+DECLARE_string(jwks_file_path);
+
 namespace {
 using namespace impala;
 
@@ -2869,6 +2875,17 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
     LOG(INFO) << "Initialized executor Impala server on "
               << TNetworkAddressToString(exec_env_->configured_backend_address());
   } else {
+    // Load JWKS from file if validation for signature of JWT token is enabled.
+    if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
+      if (!FLAGS_jwks_file_path.empty()) {
+        RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_file_path));
+      } else {
+        LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
+                   << " is enabled.";
+        return Status("JWKS file is not specified");
+      }
+    }
+
     // Initialize the client servers.
     shared_ptr<ImpalaServer> handler = shared_from_this();
     if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0)) {
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index af12105..f4f6e1e 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -46,12 +46,13 @@ using strings::Substitute;
 
 THttpServerTransportFactory::THttpServerTransportFactory(const std::string server_name,
     impala::MetricGroup* metrics, bool has_ldap, bool has_kerberos, bool use_cookies,
-    bool check_trusted_domain, bool has_saml)
+    bool check_trusted_domain, bool has_saml, bool has_jwt)
   : has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     use_cookies_(use_cookies),
     check_trusted_domain_(check_trusted_domain),
     has_saml_(has_saml),
+    has_jwt_(has_jwt),
     metrics_enabled_(metrics != nullptr) {
   if (metrics_enabled_) {
     if (has_ldap_) {
@@ -82,18 +83,25 @@ THttpServerTransportFactory::THttpServerTransportFactory(const std::string serve
       http_metrics_.total_saml_auth_failure_ =
           metrics->AddCounter(Substitute("$0.total-saml-auth-failure", server_name), 0);
     }
+    if (has_jwt_) {
+      http_metrics_.total_jwt_token_auth_success_ = metrics->AddCounter(
+          Substitute("$0.total-jwt-token-auth-success", server_name), 0);
+      http_metrics_.total_jwt_token_auth_failure_ = metrics->AddCounter(
+          Substitute("$0.total-jwt-token-auth-failure", server_name), 0);
+    }
   }
 }
 
 THttpServer::THttpServer(std::shared_ptr<TTransport> transport, bool has_ldap,
     bool has_kerberos, bool has_saml, bool use_cookies, bool check_trusted_domain,
-    bool metrics_enabled, HttpMetrics* http_metrics)
+    bool has_jwt, bool metrics_enabled, HttpMetrics* http_metrics)
   : THttpTransport(transport),
     has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     has_saml_(has_saml),
     use_cookies_(use_cookies),
     check_trusted_domain_(check_trusted_domain),
+    has_jwt_(has_jwt),
     metrics_enabled_(metrics_enabled),
     http_metrics_(http_metrics) {}
 
@@ -127,7 +135,7 @@ void THttpServer::parseHeader(char* header) {
     contentLength_ = atoi(value);
   } else if (strncmp(header, "X-Forwarded-For", sz) == 0) {
     origin_ = value;
-  } else if ((has_ldap_ || has_kerberos_ || has_saml_)
+  } else if ((has_ldap_ || has_kerberos_ || has_saml_ || has_jwt_)
       && THRIFT_strncasecmp(header, "Authorization", sz) == 0) {
     auth_value_ = string(value);
   } else if (use_cookies_ && THRIFT_strncasecmp(header, "Cookie", sz) == 0) {
@@ -209,7 +217,7 @@ bool THttpServer::parseStatusLine(char* status) {
 }
 
 void THttpServer::headersDone() {
-  if (!has_ldap_ && !has_kerberos_ && !has_saml_) {
+  if (!has_ldap_ && !has_kerberos_ && !has_saml_ && !has_jwt_) {
     // We don't need to authenticate.
     resetAuthState();
     return;
@@ -240,6 +248,25 @@ void THttpServer::headersDone() {
     }
   }
 
+  if (!authorized && has_jwt_ && !auth_value_.empty()
+      && auth_value_.find('.') != string::npos) {
+    // Check Authorization header with the Bearer authentication scheme as:
+    // Authorization: Bearer <token>
+    // JWT contains at least one period ('.'). A well-formed JWT consists of three
+    // concatenated Base64url-encoded strings, separated by dots (.).
+    StripWhiteSpace(&auth_value_);
+    string jwt_token;
+    bool got_bearer_auth = TryStripPrefixString(auth_value_, "Bearer ", &jwt_token);
+    if (got_bearer_auth) {
+      if (callbacks_.jwt_token_auth_fn(jwt_token)) {
+        authorized = true;
+        if (metrics_enabled_) http_metrics_->total_jwt_token_auth_success_->Increment(1);
+      } else {
+        if (metrics_enabled_) http_metrics_->total_jwt_token_auth_failure_->Increment(1);
+      }
+    }
+  }
+
   if (!authorized && has_saml_) {
     bool fallback_to_other_auths = true;
     if (saml_port_ != -1) {
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index 3c26e7e..3407d3e 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -54,6 +54,9 @@ struct HttpMetrics {
 
   impala::IntCounter* total_saml_auth_success_ = nullptr;
   impala::IntCounter* total_saml_auth_failure_ = nullptr;
+
+  impala::IntCounter* total_jwt_token_auth_success_ = nullptr;
+  impala::IntCounter* total_jwt_token_auth_failure_ = nullptr;
 };
 
 /*
@@ -119,11 +122,17 @@ public:
     // SAML2 SSO.
     std::function<impala::TWrappedHttpRequest*()> init_wrapped_http_request_fn =
         [&]() { return (impala::TWrappedHttpRequest*) NULL; };
+
+    // Function that takes the JWT token from the header, and returns true
+    // if verification for the token is successful.
+    std::function<bool(const std::string&)> jwt_token_auth_fn = [&](const std::string&) {
+      return false;
+    };
   };
 
   THttpServer(std::shared_ptr<TTransport> transport, bool has_ldap, bool has_kerberos,
-      bool has_saml, bool use_cookies, bool check_trusted_domain, bool metrics_enabled,
-      HttpMetrics* http_metrics);
+      bool has_saml, bool use_cookies, bool check_trusted_domain, bool has_jwt,
+      bool metrics_enabled, HttpMetrics* http_metrics);
 
   virtual ~THttpServer();
 
@@ -147,9 +156,9 @@ protected:
   void resetAuthState();
  private:
   // If either of the following is true, a '401 - Unauthorized' will be returned to the
-  // client on requests that do not contain a valid 'Authorization' of SAML SSO related
-  // header. If 'has_ldap_' is true, 'Basic' auth headers will be processed, and if
-  // 'has_kerberos_' is true 'Negotiate' auth headers will be processed.
+  // client on requests that do not contain a valid 'Authorization' of SAML SSO or JWT
+  // related header. If 'has_ldap_' is true, 'Basic' auth headers will be processed, and
+  // if 'has_kerberos_' is true 'Negotiate' auth headers will be processed.
   bool has_ldap_ = false;
   bool has_kerberos_ = false;
 
@@ -186,6 +195,9 @@ protected:
   // trusted domain.
   bool check_trusted_domain_ = false;
 
+  // If set, support for trusting an authentication based on JWT token.
+  bool has_jwt_ = false;
+
   bool metrics_enabled_ = false;
   HttpMetrics* http_metrics_ = nullptr;
 
@@ -203,13 +215,14 @@ public:
 
  THttpServerTransportFactory(const std::string server_name, impala::MetricGroup* metrics,
      bool has_ldap, bool has_kerberos, bool use_cookies, bool check_trusted_domain,
-     bool has_saml);
+     bool has_saml, bool has_jwt);
 
  virtual ~THttpServerTransportFactory() {}
 
  virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
-   return std::shared_ptr<TTransport>(new THttpServer(trans, has_ldap_, has_kerberos_,
-       has_saml_, use_cookies_, check_trusted_domain_, metrics_enabled_, &http_metrics_));
+   return std::shared_ptr<TTransport>(
+       new THttpServer(trans, has_ldap_, has_kerberos_, has_saml_, use_cookies_,
+           check_trusted_domain_, has_jwt_, metrics_enabled_, &http_metrics_));
   }
 
  private:
@@ -218,6 +231,7 @@ public:
   bool use_cookies_ = false;
   bool check_trusted_domain_ = false;
   bool has_saml_ = false;
+  bool has_jwt_ = false;
 
   // Metrics for every transport produced by this factory.
   bool metrics_enabled_ = false;
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 25aae66..168112a 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -63,6 +63,7 @@ set(UTIL_SRCS
   impala-bloom-filter-buffer-allocator.cc
   jni-util.cc
   json-util.cc
+  jwt-util.cc
   ldap-util.cc
   ldap-search-bind.cc
   ldap-simple-bind.cc
@@ -129,6 +130,7 @@ add_library(UtilTests STATIC
   fixed-size-hash-table-test.cc
   hdfs-util-test.cc
   hdr-histogram-test.cc
+  jwt-util-test.cc
   logging-support-test.cc
   metrics-test.cc
   min-max-filter-test.cc
@@ -198,6 +200,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdr-histogram-test HdrHistogramTest.*)
 # internal-queue-test has a non-standard main(), so it needs a small amount of thought
 # to use a unified executable
 ADD_BE_LSAN_TEST(internal-queue-test)
+ADD_UNIFIED_BE_LSAN_TEST(jwt-util-test "JwtUtilTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(logging-support-test "LoggingSupport.*")
 ADD_UNIFIED_BE_LSAN_TEST(metrics-test "MetricsTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(min-max-filter-test "MinMaxFilterTest.*")
diff --git a/be/src/util/jwt-util-internal.h b/be/src/util/jwt-util-internal.h
new file mode 100644
index 0000000..568326d
--- /dev/null
+++ b/be/src/util/jwt-util-internal.h
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_JWT_UTIL_INTERNAL_H
+#define IMPALA_JWT_UTIL_INTERNAL_H
+
+#include <string>
+#include <unordered_map>
+
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wreserved-id-macro"
+#pragma clang diagnostic ignored "-Wunused-private-field"
+// picojson/picojson.h which is included by jwt-cpp/jwt.h defines __STDC_FORMAT_MACROS
+// without checking if it's already defined so un-define the micro here to avoid
+// re-definition error. Also need to hide warning "macro name is a reserved identifier".
+#ifdef __STDC_FORMAT_MACROS
+#undef __STDC_FORMAT_MACROS
+#endif
+#include <jwt-cpp/jwt.h>
+#pragma clang diagnostic pop
+
+#include "common/logging.h"
+#include "common/status.h"
+
+namespace impala {
+
+using DecodedJWT = jwt::decoded_jwt<jwt::picojson_traits>;
+using JWTVerifier = jwt::verifier<jwt::default_clock, jwt::picojson_traits>;
+
+/// Key-Value map for parsing Json keys.
+typedef std::unordered_map<std::string, std::string> JsonKVMap;
+
+class JsonWebKeySet;
+
+/// JWTPublicKey:
+/// This class represent cryptographic public key for JSON Web Token (JWT) verification.
+class JWTPublicKey {
+ public:
+  JWTPublicKey(std::string algorithm, std::string pub_key)
+    : verifier_(jwt::verify()), algorithm_(algorithm), public_key_(pub_key) {}
+
+  /// Verify the given decoded token.
+  Status Verify(const DecodedJWT& decoded_jwt, const std::string& algorithm) const;
+
+  const std::string& get_algorithm() const { return algorithm_; }
+  const std::string& get_key() const { return public_key_; }
+
+ protected:
+  /// JWT Verifier.
+  JWTVerifier verifier_;
+
+ private:
+  /// Signing Algorithm:
+  /// Currently support following JSON Web Algorithms (JWA):
+  /// HS256, HS384, HS512, RS256, RS384, and RS512.
+  const std::string algorithm_;
+  /// Public key value:
+  /// For EC and RSA families of algorithms, it's the public key converted in PEM-encoded
+  /// format since jwt-cpp APIs only accept EC/RSA public keys in PEM-encoded format.
+  /// For HMAC-SHA2, it's Octet Sequence key representing secret key.
+  const std::string public_key_;
+};
+
+/// JWT Public Key for HS256.
+/// HS256: HMAC using SHA-256.
+class HS256JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  HS256JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs256(pub_key));
+  }
+};
+
+/// JWT Public Key for HS384.
+/// HS384: HMAC using SHA-384.
+class HS384JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw exception if failed to initialize the JWT verifier.
+  HS384JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs384(pub_key));
+  }
+};
+
+/// JWT Public Key for HS512.
+/// HS512: HMAC using SHA-512.
+class HS512JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  HS512JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::hs512(pub_key));
+  }
+};
+
+/// JWT Public Key for RS256.
+/// RS256: RSASSA-PKCS1-v1_5 using SHA-256.
+class RS256JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  RS256JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs256(pub_key, "", "", ""));
+  }
+};
+
+/// JWT Public Key for RS384.
+/// RS384: RSASSA-PKCS1-v1_5 using SHA-384.
+class RS384JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw exception if failed to initialize the JWT verifier.
+  RS384JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs384(pub_key, "", "", ""));
+  }
+};
+
+/// JWT Public Key for RS512.
+/// RS512: RSASSA-PKCS1-v1_5 using SHA-512.
+class RS512JWTPublicKey : public JWTPublicKey {
+ public:
+  /// Throw JWT exception if failed to initialize the verifier.
+  RS512JWTPublicKey(std::string algorithm, std::string pub_key)
+    : JWTPublicKey(algorithm, pub_key) {
+    verifier_.allow_algorithm(jwt::algorithm::rs512(pub_key, "", "", ""));
+  }
+};
+
+/// Construct a JWKPublicKey of HS from the JWK.
+class HSJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+};
+
+/// Construct a JWKPublicKey of RSA from the JWK.
+class RSAJWTPublicKeyBuilder {
+ public:
+  static Status CreateJWKPublicKey(JsonKVMap& kv_map, JWTPublicKey** pub_key_out);
+
+ private:
+  /// Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL
+  /// APIs.
+  static bool ConvertJwkToPem(
+      const std::string& base64_n, const std::string& base64_e, std::string& pub_key);
+};
+
+/// JSON Web Key Set (JWKS) conveys the public keys used by the signing party to the
+/// clients that need to validate signatures. It represents a cryptographic key set in
+/// JSON data structure.
+/// This class works as JWT provider, which load the JWKS from file, store keys in an
+/// internal maps for each family of algorithms, and provides API to retrieve key by
+/// key-id.
+/// Init() should be called during the initialization of the daemon. There is no
+/// more modification for the instance after Init() return. The class is thread safe.
+class JsonWebKeySet {
+ public:
+  explicit JsonWebKeySet() {}
+
+  /// Map from a key ID (kid) to a JWTPublicKey.
+  typedef std::unordered_map<std::string, std::unique_ptr<JWTPublicKey>> JWTPublicKeyMap;
+
+  /// Load JWKS stored in a JSON file. Returns an error if problems were encountered
+  /// while parsing/constructing the Json Web keys. If no keys were given in the file,
+  /// the internal maps will be empty.
+  Status Init(const std::string& jwks_file_path);
+
+  /// Look up the key ID in the internal key maps and returns the key if the lookup was
+  /// successful, otherwise return nullptr.
+  const JWTPublicKey* LookupRSAPublicKey(const std::string& kid) const;
+  const JWTPublicKey* LookupHSKey(const std::string& kid) const;
+
+  /// Return number of keys for each family of algorithms.
+  int GetHSKeyNum() const { return hs_key_map_.size(); }
+  /// Return number of keys for RSA.
+  int GetRSAPublicKeyNum() const { return rsa_pub_key_map_.size(); }
+
+  /// Return all keys for HS.
+  const JWTPublicKeyMap* GetAllHSKeys() const { return &hs_key_map_; }
+  /// Return all keys for RSA.
+  const JWTPublicKeyMap* GetAllRSAPublicKeys() const { return &rsa_pub_key_map_; }
+
+  /// Return TRUE if there is no key.
+  bool IsEmpty() const { return hs_key_map_.empty() && rsa_pub_key_map_.empty(); }
+
+ private:
+  friend class JWKSetParser;
+
+  /// Following two functions are called inside Init().
+  /// Add a RSA public key.
+  void AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key);
+  /// Add a HS key.
+  void AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key);
+
+  /// Note: According to section 4.5 of RFC 7517 (JSON Web Key), different keys might use
+  /// the same "kid" value is if they have different "kty" (key type) values but are
+  /// considered to be equivalent alternatives by the application using them. So keys
+  /// for each "kty" are saved in different maps.
+
+  /// Octet Sequence keys for HS256 (HMAC using SHA-256), HS384 and HS512.
+  /// kty (key type): oct.
+  JWTPublicKeyMap hs_key_map_;
+  /// Public keys for RSA family of algorithms: RS256, RS384, RS512.
+  /// kty (key type): RSA.
+  JWTPublicKeyMap rsa_pub_key_map_;
+};
+
+} // namespace impala
+
+#endif
diff --git a/be/src/util/jwt-util-test.cc b/be/src/util/jwt-util-test.cc
new file mode 100644
index 0000000..c45db48
--- /dev/null
+++ b/be/src/util/jwt-util-test.cc
@@ -0,0 +1,702 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdio> // file stuff
+#include <gutil/strings/substitute.h>
+
+#include "jwt-util-internal.h"
+#include "jwt-util.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+using std::string;
+
+std::string rsa_priv_key_pem = R"(-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC4ZtdaIrd1BPIJ
+tfnF0TjIK5inQAXZ3XlCrUlJdP+XHwIRxdv1FsN12XyMYO/6ymLmo9ryoQeIrsXB
+XYqlET3zfAY+diwCb0HEsVvhisthwMU4gZQu6TYW2s9LnXZB5rVtcBK69hcSlA2k
+ZudMZWxZcj0L7KMfO2rIvaHw/qaVOE9j0T257Z8Kp2CLF9MUgX0ObhIsdumFRLaL
+DvDUmBPr2zuh/34j2XmWwn1yjN/WvGtdfhXW79Ki1S40HcWnygHgLV8sESFKUxxQ
+mKvPUTwDOIwLFL5WtE8Mz7N++kgmDcmWMCHc8kcOIu73Ta/3D4imW7VbKgHZo9+K
+3ESFE3RjAgMBAAECggEBAJTEIyjMqUT24G2FKiS1TiHvShBkTlQdoR5xvpZMlYbN
+tVWxUmrAGqCQ/TIjYnfpnzCDMLhdwT48Ab6mQJw69MfiXwc1PvwX1e9hRscGul36
+ryGPKIVQEBsQG/zc4/L2tZe8ut+qeaK7XuYrPp8bk/X1e9qK5m7j+JpKosNSLgJj
+NIbYsBkG2Mlq671irKYj2hVZeaBQmWmZxK4fw0Istz2WfN5nUKUeJhTwpR+JLUg4
+ELYYoB7EO0Cej9UBG30hbgu4RyXA+VbptJ+H042K5QJROUbtnLWuuWosZ5ATldwO
+u03dIXL0SH0ao5NcWBzxU4F2sBXZRGP2x/jiSLHcqoECgYEA4qD7mXQpu1b8XO8U
+6abpKloJCatSAHzjgdR2eRDRx5PMvloipfwqA77pnbjTUFajqWQgOXsDTCjcdQui
+wf5XAaWu+TeAVTytLQbSiTsBhrnoqVrr3RoyDQmdnwHT8aCMouOgcC5thP9vQ8Us
+rVdjvRRbnJpg3BeSNimH+u9AHgsCgYEA0EzcbOltCWPHRAY7B3Ge/AKBjBQr86Kv
+TdpTlxePBDVIlH+BM6oct2gaSZZoHbqPjbq5v7yf0fKVcXE4bSVgqfDJ/sZQu9Lp
+PTeV7wkk0OsAMKk7QukEpPno5q6tOTNnFecpUhVLLlqbfqkB2baYYwLJR3IRzboJ
+FQbLY93E8gkCgYB+zlC5VlQbbNqcLXJoImqItgQkkuW5PCgYdwcrSov2ve5r/Acz
+FNt1aRdSlx4176R3nXyibQA1Vw+ztiUFowiP9WLoM3PtPZwwe4bGHmwGNHPIfwVG
+m+exf9XgKKespYbLhc45tuC08DATnXoYK7O1EnUINSFJRS8cezSI5eHcbQKBgQDC
+PgqHXZ2aVftqCc1eAaxaIRQhRmY+CgUjumaczRFGwVFveP9I6Gdi+Kca3DE3F9Pq
+PKgejo0SwP5vDT+rOGHN14bmGJUMsX9i4MTmZUZ5s8s3lXh3ysfT+GAhTd6nKrIE
+kM3Nh6HWFhROptfc6BNusRh1kX/cspDplK5x8EpJ0QKBgQDWFg6S2je0KtbV5PYe
+RultUEe2C0jYMDQx+JYxbPmtcopvZQrFEur3WKVuLy5UAy7EBvwMnZwIG7OOohJb
+vkSpADK6VPn9lbqq7O8cTedEHttm6otmLt8ZyEl3hZMaL3hbuRj6ysjmoFKx6CrX
+rK0/Ikt5ybqUzKCMJZg2VKGTxg==
+-----END PRIVATE KEY-----)";
+std::string rsa_pub_key_pem = R"(-----BEGIN PUBLIC KEY-----
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAuGbXWiK3dQTyCbX5xdE4
+yCuYp0AF2d15Qq1JSXT/lx8CEcXb9RbDddl8jGDv+spi5qPa8qEHiK7FwV2KpRE9
+83wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qea1bXASuvYXEpQNpGbnTGVs
+WXI9C+yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTFIF9Dm4SLHbphUS2iw7w1JgT
+69s7of9+I9l5lsJ9cozf1rxrXX4V1u/SotUuNB3Fp8oB4C1fLBEhSlMcUJirz1E8
+AziMCxS+VrRPDM+zfvpIJg3JljAh3PJHDiLu902v9w+Iplu1WyoB2aPfitxEhRN0
+YwIDAQAB
+-----END PUBLIC KEY-----)";
+std::string rsa_pub_key_jwk_n =
+    "uGbXWiK3dQTyCbX5xdE4yCuYp0AF2d15Qq1JSXT_lx8CEcXb9RbDddl8jGDv-sp"
+    "i5qPa8qEHiK7FwV2KpRE983wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qe"
+    "a1bXASuvYXEpQNpGbnTGVsWXI9C-yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTF"
+    "IF9Dm4SLHbphUS2iw7w1JgT69s7of9-I9l5lsJ9cozf1rxrXX4V1u_SotUuNB3F"
+    "p8oB4C1fLBEhSlMcUJirz1E8AziMCxS-VrRPDM-zfvpIJg3JljAh3PJHDiLu902"
+    "v9w-Iplu1WyoB2aPfitxEhRN0Yw";
+std::string rsa_pub_key_jwk_e = "AQAB";
+std::string rsa_invalid_pub_key_jwk_n =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+std::string rsa512_priv_key_pem = R"(-----BEGIN RSA PRIVATE KEY-----
+MIICWwIBAAKBgQDdlatRjRjogo3WojgGHFHYLugdUWAY9iR3fy4arWNA1KoS8kVw
+33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQsHUfQrSDv+MuSUMAe8jzKE4qW
++jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5Do2kQ+X5xK9cipRgEKwIDAQAB
+AoGAD+onAtVye4ic7VR7V50DF9bOnwRwNXrARcDhq9LWNRrRGElESYYTQ6EbatXS
+3MCyjjX2eMhu/aF5YhXBwkppwxg+EOmXeh+MzL7Zh284OuPbkglAaGhV9bb6/5Cp
+uGb1esyPbYW+Ty2PC0GSZfIXkXs76jXAu9TOBvD0ybc2YlkCQQDywg2R/7t3Q2OE
+2+yo382CLJdrlSLVROWKwb4tb2PjhY4XAwV8d1vy0RenxTB+K5Mu57uVSTHtrMK0
+GAtFr833AkEA6avx20OHo61Yela/4k5kQDtjEf1N0LfI+BcWZtxsS3jDM3i1Hp0K
+Su5rsCPb8acJo5RO26gGVrfAsDcIXKC+bQJAZZ2XIpsitLyPpuiMOvBbzPavd4gY
+6Z8KWrfYzJoI/Q9FuBo6rKwl4BFoToD7WIUS+hpkagwWiz+6zLoX1dbOZwJACmH5
+fSSjAkLRi54PKJ8TFUeOP15h9sQzydI8zJU+upvDEKZsZc/UhT/SySDOxQ4G/523
+Y0sz/OZtSWcol/UMgQJALesy++GdvoIDLfJX5GBQpuFgFenRiRDabxrE9MNUZ2aP
+FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
+-----END RSA PRIVATE KEY-----)";
+std::string rsa512_pub_key_pem = R"(-----BEGIN PUBLIC KEY-----
+MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDdlatRjRjogo3WojgGHFHYLugd
+UWAY9iR3fy4arWNA1KoS8kVw33cJibXr8bvwUAUparCwlvdbH6dvEOfou0/gCFQs
+HUfQrSDv+MuSUMAe8jzKE4qW+jK+xQU9a03GUnKHkkle+Q0pX/g6jXZ7r1/xAK5D
+o2kQ+X5xK9cipRgEKwIDAQAB
+-----END PUBLIC KEY-----)";
+std::string rsa512_pub_key_jwk_n =
+    "3ZWrUY0Y6IKN1qI4BhxR2C7oHVFgGPYkd38uGq1jQNSqEvJFcN93CYm16_G78FA"
+    "FKWqwsJb3Wx-nbxDn6LtP4AhULB1H0K0g7_jLklDAHvI8yhOKlvoyvsUFPWtNxl"
+    "Jyh5JJXvkNKV_4Oo12e69f8QCuQ6NpEPl-cSvXIqUYBCs";
+std::string rsa512_pub_key_jwk_e = "AQAB";
+std::string rsa512_invalid_pub_key_jwk_n =
+    "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46"
+    "rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vd"
+    "i_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0i"
+    "wzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKh"
+    "CTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-X"
+    "kiCcrkyS1cnghnllh-LCwQu1sYw";
+
+std::string kid_1 = "public:c424b67b-fe28-45d7-b015-f79da50b5b21";
+std::string kid_2 = "public:9b9d0b47-b9ed-4ba6-9180-52fc5b161a3a";
+
+std::string jwks_hs_file_format = R"(
+{
+  "keys": [
+    { "kty": "oct", "kid": "$0", "alg": "$1", "k": "$2" }
+  ]
+})";
+
+std::string jwks_rsa_file_format = R"(
+{
+  "keys": [
+    { "kty": "RSA", "kid": "$0", "alg": "$1", "n": "$2", "e": "$3" },
+    { "kty": "RSA", "kid": "$4", "alg": "$5", "n": "$6", "e": "$7" }
+  ]
+})";
+
+/// Utility class for creating a file that will be automatically deleted upon test
+/// completion.
+class TempTestDataFile {
+ public:
+  // Creates a temporary file with the specified contents.
+  TempTestDataFile(const std::string& contents);
+
+  ~TempTestDataFile() { Delete(); }
+
+  /// Returns the absolute path to the file.
+  const std::string& Filename() const { return name_; }
+
+ private:
+  std::string name_;
+  bool deleted_;
+
+  // Delete this temporary file
+  void Delete();
+};
+
+TempTestDataFile::TempTestDataFile(const std::string& contents)
+  : name_("/tmp/jwks_XXXXXX"), deleted_(false) {
+  int fd = mkstemp(&name_[0]);
+  if (fd == -1) {
+    std::cout << "Error creating temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  if (close(fd) != 0) {
+    std::cout << "Error closing temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+
+  FILE* handle = fopen(name_.c_str(), "w");
+  if (handle == nullptr) {
+    std::cout << "Error creating temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  int status = fputs(contents.c_str(), handle);
+  if (status < 0) {
+    std::cout << "Error writing to temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+  status = fclose(handle);
+  if (status != 0) {
+    std::cout << "Error closing temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+}
+
+void TempTestDataFile::Delete() {
+  if (deleted_) return;
+  deleted_ = true;
+  if (remove(name_.c_str()) != 0) {
+    std::cout << "Error deleting temp file; " << strerror(errno) << std::endl;
+    abort();
+  }
+}
+
+TEST(JwtUtilTest, LoadJwksFile) {
+  // Load JWKS from file.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_FALSE(jwks->IsEmpty());
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ("rs256", key1->get_algorithm());
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  std::string non_existing_kid("public:c424b67b-fe28-45d7-b015-f79da5-xxxxx");
+  const JWTPublicKey* key3 = jwks->LookupRSAPublicKey(non_existing_kid);
+  ASSERT_FALSE(key3 != nullptr);
+}
+
+TEST(JwtUtilTest, LoadInvalidJwksFiles) {
+  // JWK without kid.
+  std::unique_ptr<TempTestDataFile> jwks_file(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "    }"
+      "  ]"
+      "}"));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'kid' property is required") != std::string::npos)
+      << "actual error: " << status.GetDetail();
+  ASSERT_TRUE(jwt_helper.GetJWKS()->IsEmpty());
+
+  // Invalid JSON format, missing "]" and "}".
+  jwks_file.reset(new TempTestDataFile(
+      "{"
+      "  \"keys\": ["
+      "    {"
+      "      \"use\": \"sig\","
+      "      \"kty\": \"RSA\","
+      "      \"kid\": \"public:c424b67b-fe28-45d7-b015-f79da50b5b21\","
+      "      \"alg\": \"RS256\","
+      "      \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
+      "      \"e\": \"AQAB\""
+      "}"));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Missing a comma or ']' after an array element")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+
+  // JWKS with empty key id.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(jwks_rsa_file_format, "", "RS256", rsa_pub_key_jwk_n, rsa_pub_key_jwk_e,
+          "", "RS256", rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e)));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'kid' property must be a non-empty string")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+
+  // JWKS with empty key value.
+  jwks_file.reset(new TempTestDataFile(
+      Substitute(jwks_rsa_file_format, kid_1, "RS256", "", "", kid_2, "RS256", "", "")));
+  status = jwt_helper.Init(jwks_file->Filename());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
+      << " Actual error: " << status.msg().msg();
+  ASSERT_TRUE(status.GetDetail().find("'n' and 'e' properties must be a non-empty string")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtHS256) {
+  // Cryptographic algorithm: HS256.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "Yx57JSBzhGFDgDj19CabRpH/+kiaKqI6UZI6lDunQKw=";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS256", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS256.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS256")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs256(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS384) {
+  // Cryptographic algorithm: HS384.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret =
+      "TlqmKRc2PNQJXTC3Go7eAadwPxA7x9byyXCi5I8tSvxrE77tYbuF5pfZAyswrkou";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS384", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS384.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS384")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs384(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtHS512) {
+  // Cryptographic algorithm: HS512.
+  // SharedSecret (Generated for MAC key (Base64 encoded)).
+  string shared_secret = "ywc6DN7+iRw1E5HOqzvrsYodykSLFutT28KN3bJnLZcZpPCNjn0b6gbMfXPcxeY"
+                         "VyuWWGDxh6gCDwPMejbuEEg==";
+  TempTestDataFile jwks_file(
+      Substitute(jwks_hs_file_format, kid_1, "HS512", shared_secret));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  EXPECT_OK(status);
+  ASSERT_EQ(1, jwks->GetHSKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupHSKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(key1->get_key(), shared_secret);
+
+  // Create a JWT token and sign it with HS512.
+  auto token = jwt::create()
+                   .set_issuer("auth0")
+                   .set_type("JWS")
+                   .set_algorithm("HS512")
+                   .set_key_id(kid_1)
+                   .set_payload_claim("username", picojson::value("impala"))
+                   .sign(jwt::algorithm::hs512(shared_secret));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS256) {
+  // Cryptographic algorithm: RS256.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  ASSERT_EQ(
+      "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1mNzlkYTUwYj"
+      "ViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoiaW1wYWxhIn0.OW5H2SClL"
+      "lsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI02W69TZNat7DYcC86A_ogLMfNXagHj"
+      "lMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU554kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2"
+      "y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgcbZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO"
+      "25b5n4fryfKasSgaZfmk0CoOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKs"
+      "Pk0bg",
+      token);
+
+  // Verify the JWT token with jwt-cpp APIs directly.
+  auto jwt_decoded_token = jwt::decode(token);
+  auto verifier = jwt::verify()
+                      .allow_algorithm(jwt::algorithm::rs256(rsa_pub_key_pem, "", "", ""))
+                      .with_issuer("auth0");
+  verifier.verify(jwt_decoded_token);
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS384) {
+  // Cryptographic algorithm: RS384.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS384",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS384", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS384.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS384")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs384(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtRS512) {
+  // Cryptographic algorithm: RS512.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS512",
+      rsa512_pub_key_jwk_n, rsa512_pub_key_jwk_e, kid_2, "RS512",
+      rsa512_invalid_pub_key_jwk_n, rsa512_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+  const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
+  ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
+
+  const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
+  ASSERT_TRUE(key1 != nullptr);
+  ASSERT_EQ(rsa512_pub_key_pem, key1->get_key());
+
+  // Create a JWT token and sign it with RS512.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kid_1)
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs512(rsa512_pub_key_pem, rsa512_priv_key_pem, "", ""));
+
+  // Verify the JWT token with our wrapper class which use public key retrieved from JWKS,
+  // and read username from the token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtNotVerifySignature) {
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Do not verify signature.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(decoded_token.get(), "username", username);
+  EXPECT_OK(status);
+  ASSERT_EQ("impala", username);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailMismatchingAlgorithms) {
+  // JWT algorithm is not matching with algorithm in JWK.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token, but set mismatching algorithm.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS512")
+          .set_key_id(kid_1)
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Failed to verify the token due to mismatching algorithms.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find(
+                  "JWT algorithm 'rs512' is not matching with JWK algorithm 'rs256'")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailKeyNotFound) {
+  // The key cannot be found in JWKS.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token with a key ID which can not be found in JWKS.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id("unfound-key-id")
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Failed to verify the token since key is not found in JWKS.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(
+      status.GetDetail().find("Invalid JWK ID in the JWT token") != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS256").sign(
+          jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+  // Verify the token by trying each key in JWK set and there is one matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  EXPECT_OK(status);
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutKeyId) {
+  // Verify JWT token without key ID.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without key ID.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").set_algorithm("RS512").sign(
+          jwt::algorithm::rs512(rsa512_pub_key_pem, rsa512_priv_key_pem, "", ""));
+  // Verify the token by trying each key in JWK set, but there is no matched key.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+}
+
+TEST(JwtUtilTest, VerifyJwtFailTokenWithoutSignature) {
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token without signature.
+  auto token =
+      jwt::create().set_issuer("auth0").set_type("JWS").sign(jwt::algorithm::none{});
+  // Failed to verify the unsigned token.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Unsecured JWT") != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+TEST(JwtUtilTest, VerifyJwtFailExpiredToken) {
+  // Sign JWT token with RS256.
+  TempTestDataFile jwks_file(Substitute(jwks_rsa_file_format, kid_1, "RS256",
+      rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
+      rsa_pub_key_jwk_e));
+  JWTHelper jwt_helper;
+  Status status = jwt_helper.Init(jwks_file.Filename());
+  EXPECT_OK(status);
+
+  // Create a JWT token and sign it with RS256.
+  auto token =
+      jwt::create()
+          .set_issuer("auth0")
+          .set_type("JWS")
+          .set_algorithm("RS256")
+          .set_key_id(kid_1)
+          .set_issued_at(std::chrono::system_clock::now())
+          .set_expires_at(std::chrono::system_clock::now() - std::chrono::seconds{10})
+          .set_payload_claim("username", picojson::value("impala"))
+          .sign(jwt::algorithm::rs256(rsa_pub_key_pem, rsa_priv_key_pem, "", ""));
+
+  // Verify the token, including expiring time.
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  status = JWTHelper::Decode(token, decoded_token);
+  EXPECT_OK(status);
+  status = jwt_helper.Verify(decoded_token.get());
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.GetDetail().find("Verification failed, error: token expired")
+      != std::string::npos)
+      << " Actual error: " << status.GetDetail();
+}
+
+} // namespace impala
diff --git a/be/src/util/jwt-util.cc b/be/src/util/jwt-util.cc
new file mode 100644
index 0000000..27c4a68
--- /dev/null
+++ b/be/src/util/jwt-util.cc
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string.h>
+#include <cerrno>
+#include <ostream>
+#include <unordered_map>
+#include <vector>
+#include <sys/stat.h>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/escaping.h>
+#include <gutil/strings/substitute.h>
+#include <openssl/bio.h>
+#include <openssl/ec.h>
+#include <openssl/evp.h>
+#include <openssl/pem.h>
+#include <openssl/rsa.h>
+#include <rapidjson/document.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/filereadstream.h>
+
+#include "common/names.h"
+#include "jwt-util-internal.h"
+#include "jwt-util.h"
+
+namespace impala {
+
+using rapidjson::Document;
+using rapidjson::Value;
+
+// JWK Set (JSON Web Key Set) is JSON data structure that represents a set of JWKs.
+// This class parses JWKS file.
+class JWKSetParser {
+ public:
+  JWKSetParser(JsonWebKeySet* jwks) : jwks_(jwks) {}
+
+  // Perform the parsing and populate JWKS's internal map. Return error status if
+  // encountering any error.
+  Status Parse(const Document& rules_doc) {
+    bool found_keys = false;
+    for (Value::ConstMemberIterator member = rules_doc.MemberBegin();
+         member != rules_doc.MemberEnd(); ++member) {
+      if (strcmp("keys", member->name.GetString()) == 0) {
+        found_keys = true;
+        RETURN_IF_ERROR(ParseKeys(member->value));
+      } else {
+        return Status(TErrorCode::JWKS_PARSE_ERROR,
+            Substitute(
+                "Unexpected property '$0' must be removed", member->name.GetString()));
+      }
+    }
+    if (!found_keys) {
+      return Status(TErrorCode::JWKS_PARSE_ERROR, "An array of keys is required");
+    }
+    return Status::OK();
+  }
+
+ private:
+  JsonWebKeySet* jwks_;
+
+  string NameOfTypeOfJsonValue(const Value& value) {
+    switch (value.GetType()) {
+      case rapidjson::kNullType:
+        return "Null";
+      case rapidjson::kFalseType:
+      case rapidjson::kTrueType:
+        return "Bool";
+      case rapidjson::kObjectType:
+        return "Object";
+      case rapidjson::kArrayType:
+        return "Array";
+      case rapidjson::kStringType:
+        return "String";
+      case rapidjson::kNumberType:
+        if (value.IsInt()) return "Integer";
+        if (value.IsDouble()) return "Float";
+      default:
+        DCHECK(false);
+        return "Unknown";
+    }
+  }
+
+  // Parse an array of keys.
+  Status ParseKeys(const Value& keys) {
+    if (!keys.IsArray()) {
+      return Status(TErrorCode::JWKS_PARSE_ERROR,
+          Substitute(
+              "'keys' must be of type Array but is a '$0'", NameOfTypeOfJsonValue(keys)));
+    } else if (keys.Size() == 0) {
+      return Status(
+          TErrorCode::JWKS_PARSE_ERROR, Substitute("'keys' must be a non empty Array"));
+    }
+    for (rapidjson::SizeType key_idx = 0; key_idx < keys.Size(); ++key_idx) {
+      const Value& key = keys[key_idx];
+      if (!key.IsObject()) {
+        return Status(TErrorCode::JWKS_PARSE_ERROR,
+            Substitute("parsing key #$0, key should be a JSON Object but is a '$1'.",
+                key_idx, NameOfTypeOfJsonValue(key)));
+      }
+      Status status = ParseKey(key);
+      if (!status.ok()) {
+        Status parse_status(
+            TErrorCode::JWKS_PARSE_ERROR, Substitute("parsing key #$0, ", key_idx));
+        parse_status.MergeStatus(status);
+        return parse_status;
+      }
+    }
+    return Status::OK();
+  }
+
+  // Parse a public key and populate JWKS's internal map.
+  Status ParseKey(const Value& json_key) {
+    std::unordered_map<std::string, std::string> kv_map;
+    string k, v;
+    for (Value::ConstMemberIterator member = json_key.MemberBegin();
+         member != json_key.MemberEnd(); ++member) {
+      k = string(member->name.GetString());
+      RETURN_IF_ERROR(ReadKeyProperty(k.c_str(), json_key, &v, /*required*/ false));
+      if (kv_map.find(k) == kv_map.end()) {
+        kv_map.insert(make_pair(k, v));
+      } else {
+        LOG(WARNING) << "Duplicate property of JWK: " << k;
+      }
+    }
+
+    auto it_kty = kv_map.find("kty");
+    if (it_kty == kv_map.end()) return Status("'kty' property is required");
+    auto it_kid = kv_map.find("kid");
+    if (it_kid == kv_map.end()) return Status("'kid' property is required");
+    string key_id = it_kid->second;
+    if (key_id.empty()) {
+      return Status(Substitute("'kid' property must be a non-empty string"));
+    }
+
+    Status status;
+    string key_type = boost::algorithm::to_lower_copy(it_kty->second);
+    if (key_type.compare("oct") == 0) {
+      JWTPublicKey* jwt_pub_key;
+      status = HSJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddHSKey(key_id, jwt_pub_key);
+    } else if (key_type.compare("rsa") == 0) {
+      JWTPublicKey* jwt_pub_key;
+      status = RSAJWTPublicKeyBuilder::CreateJWKPublicKey(kv_map, &jwt_pub_key);
+      if (status.ok()) jwks_->AddRSAPublicKey(key_id, jwt_pub_key);
+    } else {
+      return Status(Substitute("Unsupported kty: '$0'", key_type));
+    }
+    return status;
+  }
+
+  // Reads a key property of the given name and assigns the property value to the out
+  // parameter. A true return value indicates success.
+  template <typename T>
+  Status ReadKeyProperty(
+      const string& name, const Value& json_key, T* value, bool required = true) {
+    const Value& json_value = json_key[name.c_str()];
+    if (json_value.IsNull()) {
+      if (required) {
+        return Status(Substitute("'$0' property is required and cannot be null", name));
+      } else {
+        return Status::OK();
+      }
+    }
+    return ValidateTypeAndExtractValue(name, json_value, value);
+  }
+
+// Extract a value stored in a rapidjson::Value and assign it to the out parameter.
+// The type will be validated before extraction. A true return value indicates success.
+// The name parameter is only used to generate an error message upon failure.
+#define EXTRACT_VALUE(json_type, cpp_type)                                             \
+  Status ValidateTypeAndExtractValue(                                                  \
+      const string& name, const Value& json_value, cpp_type* value) {                  \
+    if (!json_value.Is##json_type()) {                                                 \
+      return Status(                                                                   \
+          Substitute("'$0' property must be of type " #json_type " but is a $1", name, \
+              NameOfTypeOfJsonValue(json_value)));                                     \
+    }                                                                                  \
+    *value = json_value.Get##json_type();                                              \
+    return Status::OK();                                                               \
+  }
+
+  EXTRACT_VALUE(String, string)
+  // EXTRACT_VALUE(Bool, bool)
+};
+
+//
+// JWTPublicKey member functions.
+//
+// Verify JWT's signature for the given decoded token with jwt-cpp API.
+Status JWTPublicKey::Verify(
+    const DecodedJWT& decoded_jwt, const std::string& algorithm) const {
+  // Verify if algorithms are matching.
+  if (algorithm_.compare(algorithm) != 0) {
+    return Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("JWT algorithm '$0' is not matching with JWK algorithm '$1'",
+            algorithm, algorithm_));
+  }
+
+  Status status;
+  try {
+    // Call jwt-cpp API to verify token's signature.
+    verifier_.verify(decoded_jwt);
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED, Substitute("RSA error: $0", e.what()));
+  } catch (const jwt::error::token_verification_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Verification failed, error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Varification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Create a JWKPublicKey of HS from the JWK.
+Status HSJWTPublicKeyBuilder::CreateJWKPublicKey(
+    JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // Octet Sequence keys for HS256, HS384 or HS512.
+  // JWK Sample:
+  // {
+  //   "kty":"oct",
+  //   "alg":"HS256",
+  //   "k":"f83OJ3D2xF1Bg8vub9tLe1gHMzV76e8Tus9uPHvRVEU",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status("'alg' property is required");
+  string algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+  if (algorithm.empty()) {
+    return Status(Substitute("'alg' property must be a non-empty string"));
+  }
+  auto it_k = kv_map.find("k");
+  if (it_k == kv_map.end()) return Status("'k' property is required");
+  if (it_k->second.empty()) {
+    return Status(Substitute("'k' property must be a non-empty string"));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm.compare("hs256") == 0) {
+      jwt_pub_key = new HS256JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm.compare("hs384") == 0) {
+      jwt_pub_key = new HS384JWTPublicKey(algorithm, it_k->second);
+    } else if (algorithm.compare("hs512") == 0) {
+      jwt_pub_key = new HS512JWTPublicKey(algorithm, it_k->second);
+    } else {
+      return Status(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Create a JWKPublicKey of RSA from the JWK.
+Status RSAJWTPublicKeyBuilder::CreateJWKPublicKey(
+    JsonKVMap& kv_map, JWTPublicKey** pub_key_out) {
+  // JWK Sample:
+  // {
+  //   "kty":"RSA",
+  //   "alg":"RS256",
+  //   "n":"sttddbg-_yjXzcFpbMJB1fI9...Q_QDhvqXx8eQ1r9smM",
+  //   "e":"AQAB",
+  //   "kid":"Id that can be uniquely Identified"
+  // }
+  auto it_alg = kv_map.find("alg");
+  if (it_alg == kv_map.end()) return Status("'alg' property is required");
+  string algorithm = boost::algorithm::to_lower_copy(it_alg->second);
+  if (algorithm.empty()) {
+    return Status(Substitute("'alg' property must be a non-empty string"));
+  }
+
+  auto it_n = kv_map.find("n");
+  auto it_e = kv_map.find("e");
+  if (it_n == kv_map.end() || it_e == kv_map.end()) {
+    return Status("'n' and 'e' properties are required");
+  } else if (it_n->second.empty() || it_e->second.empty()) {
+    return Status("'n' and 'e' properties must be a non-empty string");
+  }
+  // Converts public key to PEM encoded form.
+  string pub_key;
+  if (!ConvertJwkToPem(it_n->second, it_e->second, pub_key)) {
+    return Status(
+        Substitute("Invalid public key 'n':'$0', 'e':'$1'", it_n->second, it_e->second));
+  }
+
+  Status status;
+  JWTPublicKey* jwt_pub_key = nullptr;
+  try {
+    if (algorithm.compare("rs256") == 0) {
+      jwt_pub_key = new RS256JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm.compare("rs384") == 0) {
+      jwt_pub_key = new RS384JWTPublicKey(algorithm, pub_key);
+    } else if (algorithm.compare("rs512") == 0) {
+      jwt_pub_key = new RS512JWTPublicKey(algorithm, pub_key);
+    } else {
+      return Status(Substitute("Invalid 'alg' property value: '$0'", algorithm));
+    }
+  } catch (const jwt::error::rsa_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED, Substitute("RSA error: $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Failed to initialize verifier, error: $0", e.what()));
+  }
+  if (!status.ok()) return status;
+  *pub_key_out = jwt_pub_key;
+  return Status::OK();
+}
+
+// Convert public key of RSA from JWK format to PEM encoded format by using OpenSSL APIs.
+bool RSAJWTPublicKeyBuilder::ConvertJwkToPem(
+    const std::string& base64_n, const std::string& base64_e, std::string& pub_key) {
+  pub_key.clear();
+  string str_n, str_e;
+  if (!WebSafeBase64Unescape(base64_n, &str_n)) return false;
+  if (!WebSafeBase64Unescape(base64_e, &str_e)) return false;
+  BIGNUM* modul = BN_bin2bn((const unsigned char*)str_n.c_str(), str_n.size(), nullptr);
+  BIGNUM* expon = BN_bin2bn((const unsigned char*)str_e.c_str(), str_e.size(), nullptr);
+
+  RSA* rsa = RSA_new();
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  rsa->n = modul;
+  rsa->e = expon;
+#else
+  // RSA_set0_key is a new API introduced in OpenSSL version 1.1
+  RSA_set0_key(rsa, modul, expon, nullptr);
+#endif
+
+  unsigned char desc[1024];
+  memset(desc, 0, 1024);
+  BIO* bio = BIO_new(BIO_s_mem());
+  PEM_write_bio_RSA_PUBKEY(bio, rsa);
+  if (BIO_read(bio, desc, 1024) > 0) {
+    pub_key = (char*)desc;
+    // Remove last '\n'.
+    if (pub_key.length() > 0 && pub_key[pub_key.length() - 1] == '\n') pub_key.pop_back();
+  }
+  BIO_free(bio);
+  RSA_free(rsa);
+  if (pub_key.empty()) return false;
+  return true;
+}
+
+//
+// JsonWebKeySet member functions.
+//
+
+Status JsonWebKeySet::Init(const string& jwks_file_path) {
+  hs_key_map_.clear();
+  rsa_pub_key_map_.clear();
+
+  // Read the file.
+  FILE* jwks_file = fopen(jwks_file_path.c_str(), "r");
+  if (jwks_file == nullptr) {
+    return Status(
+        Substitute("Could not open JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  // Check for an empty file and ignore it.
+  struct stat jwks_file_stats;
+  if (fstat(fileno(jwks_file), &jwks_file_stats)) {
+    fclose(jwks_file);
+    return Status(
+        Substitute("Error reading JWKS file '$0'; $1", jwks_file_path, strerror(errno)));
+  }
+  if (jwks_file_stats.st_size == 0) {
+    fclose(jwks_file);
+    return Status::OK();
+  }
+
+  char readBuffer[65536];
+  rapidjson::FileReadStream stream(jwks_file, readBuffer, sizeof(readBuffer));
+  Document jwks_doc;
+  jwks_doc.ParseStream(stream);
+  fclose(jwks_file);
+  if (jwks_doc.HasParseError()) {
+    return Status(
+        TErrorCode::JWKS_PARSE_ERROR, GetParseError_En(jwks_doc.GetParseError()));
+  }
+  if (!jwks_doc.IsObject()) {
+    return Status(TErrorCode::JWKS_PARSE_ERROR, "root element must be a JSON Object");
+  }
+  if (!jwks_doc.HasMember("keys")) {
+    return Status(TErrorCode::JWKS_PARSE_ERROR, "keys is required");
+  }
+
+  JWKSetParser jwks_parser(this);
+  return jwks_parser.Parse(jwks_doc);
+}
+
+void JsonWebKeySet::AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
+  if (hs_key_map_.find(key_id) == hs_key_map_.end()) {
+    hs_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for HS key: " << key_id;
+  }
+}
+
+void JsonWebKeySet::AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
+  if (rsa_pub_key_map_.find(key_id) == rsa_pub_key_map_.end()) {
+    rsa_pub_key_map_[key_id].reset(jwk_pub_key);
+  } else {
+    LOG(WARNING) << "Duplicate key ID of JWK for RSA public key: " << key_id;
+  }
+}
+
+const JWTPublicKey* JsonWebKeySet::LookupHSKey(const std::string& kid) const {
+  auto find_it = hs_key_map_.find(kid);
+  if (find_it == hs_key_map_.end()) {
+    // Could not find key for the given key ID.
+    return nullptr;
+  }
+  return find_it->second.get();
+}
+
+const JWTPublicKey* JsonWebKeySet::LookupRSAPublicKey(const std::string& kid) const {
+  auto find_it = rsa_pub_key_map_.find(kid);
+  if (find_it == rsa_pub_key_map_.end()) {
+    // Could not find key for the given key ID.
+    return nullptr;
+  }
+  return find_it->second.get();
+}
+
+//
+// JWTHelper member functions.
+//
+
+struct JWTHelper::JWTDecodedToken {
+  JWTDecodedToken(const DecodedJWT& decoded_jwt) : decoded_jwt_(decoded_jwt) {}
+  DecodedJWT decoded_jwt_;
+};
+
+JWTHelper* JWTHelper::jwt_helper_ = new JWTHelper();
+
+void JWTHelper::TokenDeleter::operator()(JWTHelper::JWTDecodedToken* token) const {
+  if (token != nullptr) delete token;
+};
+
+Status JWTHelper::Init(const std::string& jwks_file_path) {
+  jwks_.reset(new JsonWebKeySet());
+  RETURN_IF_ERROR(jwks_->Init(jwks_file_path));
+  if (jwks_->IsEmpty()) LOG(WARNING) << "JWKS file is empty.";
+  initialized_ = true;
+  return Status::OK();
+}
+
+// Decode the given JWT token.
+Status JWTHelper::Decode(const string& token, UniqueJWTDecodedToken& decoded_token_out) {
+  Status status;
+  try {
+    // Call jwt-cpp API to decode the JWT token with default jwt::json_traits
+    // (jwt::picojson_traits).
+    decoded_token_out.reset(new JWTDecodedToken(jwt::decode(token)));
+#ifndef NDEBUG
+    std::stringstream msg;
+    msg << "JWT token header: ";
+    for (auto& e : decoded_token_out.get()->decoded_jwt_.get_header_claims()) {
+      msg << e.first << "=" << e.second.to_json().serialize() << ";";
+    }
+    msg << " JWT token payload: ";
+    for (auto& e : decoded_token_out.get()->decoded_jwt_.get_payload_claims()) {
+      msg << e.first << "=" << e.second.to_json().serialize() << ";";
+    }
+    VLOG(3) << msg.str();
+#endif
+  } catch (const std::invalid_argument& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Token is not in correct format, error: $0", e.what()));
+  } catch (const std::runtime_error& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Base64 decoding failed or invalid json, error: $0", e.what()));
+  }
+  return status;
+}
+
+// Validate the token's signature with public key.
+Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
+  DCHECK(initialized_);
+  DCHECK(decoded_token != nullptr);
+
+  if (decoded_token->decoded_jwt_.get_signature().empty()) {
+    // Don't accept JWT without a signature.
+    return Status(TErrorCode::JWT_VERIFY_FAILED, "Unsecured JWT");
+  } else if (jwks_ == nullptr) {
+    // Skip to signature validation if there is no public key.
+    return Status::OK();
+  }
+
+  Status status;
+  try {
+    string algorithm =
+        boost::algorithm::to_lower_copy(decoded_token->decoded_jwt_.get_algorithm());
+    if (decoded_token->decoded_jwt_.has_key_id()) {
+      // Get key id from token's header and use it to retrieve the public key from JWKS.
+      std::string key_id = decoded_token->decoded_jwt_.get_key_id();
+
+      const JWTPublicKey* pub_key = nullptr;
+      if (algorithm.substr(0, 2).compare("hs") == 0) {
+        pub_key = jwks_->LookupHSKey(key_id);
+      } else if (algorithm.substr(0, 2).compare("rs") == 0) {
+        pub_key = jwks_->LookupRSAPublicKey(key_id);
+      } else {
+        return Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (pub_key == nullptr) {
+        return Status(TErrorCode::JWT_VERIFY_FAILED, "Invalid JWK ID in the JWT token");
+      }
+      // Use the public key to verify the token's signature.
+      status = pub_key->Verify(decoded_token->decoded_jwt_, algorithm);
+    } else {
+      // According to RFC 7517 (JSON Web Key), 'kid' is OPTIONAL so it's possible there
+      // is no key id in the token's header. In this case, get all of public keys from
+      // JWKS for the family of algorithms.
+      const JsonWebKeySet::JWTPublicKeyMap* key_map = nullptr;
+      if (algorithm.substr(0, 2).compare("hs") == 0) {
+        key_map = jwks_->GetAllHSKeys();
+      } else if (algorithm.substr(0, 2).compare("rs") == 0) {
+        key_map = jwks_->GetAllRSAPublicKeys();
+      } else {
+        return Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
+      }
+      if (key_map->size() == 0) {
+        return Status(
+            TErrorCode::JWT_VERIFY_FAILED, "Verification failed, no matching key");
+      }
+      // Try each key with matching algorithm util the signature is verified.
+      for (auto& key : *key_map) {
+        status = key.second->Verify(decoded_token->decoded_jwt_, algorithm);
+        if (status.ok()) return status;
+      }
+    }
+  } catch (const std::bad_cast& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim was present but not a string, error: $0", e.what()));
+  } catch (const jwt::error::claim_not_present_exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim not present in JWT token, error $0", e.what()));
+  } catch (const std::exception& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Token varification failed, error: $0", e.what()));
+  }
+  return status;
+}
+
+Status JWTHelper::GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+    const string& jwt_custom_claim_username, string& username) {
+  DCHECK(decoded_token != nullptr);
+  DCHECK(!jwt_custom_claim_username.empty());
+  Status status;
+  try {
+    // Get value of custom claim 'username' from the token payload.
+    if (decoded_token->decoded_jwt_.has_payload_claim(jwt_custom_claim_username)) {
+      // Assume the claim data type of 'username' is string.
+      username.assign(
+          decoded_token->decoded_jwt_.get_payload_claim(jwt_custom_claim_username)
+              .to_json()
+              .to_str());
+      if (username.empty()) {
+        status = Status(TErrorCode::JWT_VERIFY_FAILED,
+            Substitute("Claim '$0' is empty", jwt_custom_claim_username));
+      }
+    } else {
+      status = Status(TErrorCode::JWT_VERIFY_FAILED,
+          Substitute("Claim '$0' was not present", jwt_custom_claim_username));
+    }
+  } catch (const std::runtime_error& e) {
+    status = Status(TErrorCode::JWT_VERIFY_FAILED,
+        Substitute("Claim '$0' was not present, error: $1", jwt_custom_claim_username,
+            e.what()));
+  }
+  return status;
+}
+
+} // namespace impala
\ No newline at end of file
diff --git a/be/src/util/jwt-util.h b/be/src/util/jwt-util.h
new file mode 100644
index 0000000..92e4468
--- /dev/null
+++ b/be/src/util/jwt-util.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_JWT_UTIL_H
+#define IMPALA_JWT_UTIL_H
+
+#include <string>
+
+#include "common/logging.h"
+#include "common/status.h"
+
+namespace impala {
+
+class JsonWebKeySet;
+
+/// JSON Web Token (JWT) is an Internet proposed standard for creating data with optional
+/// signature and/or optional encryption whose payload holds JSON that asserts some
+/// number of claims. The tokens are signed either using a private secret or a public/
+/// private key.
+/// This class works as wrapper for jwt-cpp. It provides APIs to decode/verify JWT token,
+/// and extracts custom claim from the payload of JWT token.
+/// JsonWebKeySet is read only after Init() is called. The class is thread safe.
+class JWTHelper {
+ public:
+  /// Opaque types for storing the JWT decoded token. This allows us to avoid including
+  /// header file jwt-cpp/jwt.h.
+  struct JWTDecodedToken;
+
+  // Custom deleter: intended for use with std::unique_ptr<JWTDecodedToken>.
+  class TokenDeleter {
+   public:
+    /// Called by unique_ptr to free JWTDecodedToken
+    void operator()(JWTHelper::JWTDecodedToken* token) const;
+  };
+  /// UniqueJWTDecodedToken -- a wrapper around opaque decoded token structure to
+  /// facilitate automatic reference counting.
+  typedef std::unique_ptr<JWTDecodedToken, TokenDeleter> UniqueJWTDecodedToken;
+
+  /// Return the single instance.
+  static JWTHelper* GetInstance() { return jwt_helper_; }
+
+  /// Load JWKS from a JSON file. Returns an error if problems were encountered.
+  Status Init(const std::string& jwks_file_path);
+
+  /// Decode the given JWT token. The decoding result is stored in decoded_token_.
+  /// Return Status::OK if the decoding is successful.
+  static Status Decode(
+      const std::string& token, UniqueJWTDecodedToken& decoded_token_out);
+
+  /// Verify the token's signature with the given JWKS. The token should be already
+  /// decoded by calling Decode().
+  /// Return Status::OK if the verification is successful.
+  Status Verify(const JWTDecodedToken* decoded_token) const;
+
+  /// Extract custom claim "Username" from from the payload of the decoded JWT token.
+  /// Return Status::OK if the extraction is successful.
+  static Status GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
+      const std::string& custom_claim_username, std::string& username);
+
+  /// Return Json Web Key Set. It's called only by unit-test code.
+  const JsonWebKeySet* GetJWKS() const { return jwks_.get(); }
+
+ private:
+  /// Single instance.
+  static JWTHelper* jwt_helper_;
+
+  /// Set it as TRUE when Init() is called.
+  bool initialized_ = false;
+
+  /// Json Web Key Set (JWKS) for Json Web Token (JWT) verification.
+  /// Only one instance per daemon.
+  std::unique_ptr<JsonWebKeySet> jwks_;
+};
+
+} // namespace impala
+
+#endif
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 74f5c3c..5a6c1e2 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -41,8 +41,8 @@
 #include "kudu/util/env.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/sockaddr.h"
-#include "rpc/authentication.h"
 #include "rpc/authentication-util.h"
+#include "rpc/authentication.h"
 #include "rpc/thrift-util.h"
 #include "runtime/exec-env.h"
 #include "service/impala-server.h"
@@ -52,6 +52,7 @@
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
+#include "util/jwt-util.h"
 #include "util/mem-info.h"
 #include "util/metrics.h"
 #include "util/os-info.h"
@@ -150,6 +151,9 @@ DECLARE_string(ssl_minimum_version);
 DECLARE_string(ssl_cipher_list);
 DECLARE_string(trusted_domain);
 DECLARE_bool(trusted_domain_use_xff_header);
+DECLARE_bool(jwt_token_auth);
+DECLARE_bool(jwt_validate_signature);
+DECLARE_string(jwt_custom_claim_username);
 
 static const char* DOC_FOLDER = "/www/";
 static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
@@ -282,7 +286,8 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     error_handler_(UrlHandler(
         bind<void>(&Webserver::ErrorHandler, this, _1, _2), "error.tmpl", false)),
     use_cookies_(FLAGS_max_cookie_lifetime_s > 0),
-    check_trusted_domain_(!FLAGS_trusted_domain.empty()) {
+    check_trusted_domain_(!FLAGS_trusted_domain.empty()),
+    use_jwt_(FLAGS_jwt_token_auth) {
   http_address_ = MakeNetworkAddress(interface.empty() ? "0.0.0.0" : interface, port);
   Init();
 
@@ -309,6 +314,12 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     total_trusted_domain_check_success_ =
         metrics->AddCounter("impala.webserver.total-trusted-domain-check-success", 0);
   }
+  if (use_jwt_) {
+    total_jwt_token_auth_success_ =
+        metrics->AddCounter("impala.webserver.total-jwt-token-auth-success", 0);
+    total_jwt_token_auth_failure_ =
+        metrics->AddCounter("impala.webserver.total-jwt-token-auth-failure", 0);
+  }
 }
 
 Webserver::~Webserver() {
@@ -600,8 +611,36 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
   }
 
   vector<string> response_headers;
-  bool authenticated = auth_mode_ != AuthMode::SPNEGO && auth_mode_ != AuthMode::LDAP;
-  // Try authenticating with a cookie first, if enabled.
+  bool authenticated = false;
+  // Try authenticating with JWT token first, if enabled.
+  if (use_jwt_) {
+    const char* auth_value = nullptr;
+    const char* value = sq_get_header(connection, "Authorization");
+    if (value != nullptr) auth_value = StripLeadingWhiteSpace(value);
+    // Check Authorization header with the Bearer authentication scheme as:
+    // Authorization: Bearer <token>
+    // A well-formed JWT consists of three concatenated Base64url-encoded strings,
+    // separated by dots (.).
+    if (auth_value != nullptr && strncasecmp(auth_value, "Bearer ", 7) == 0
+        && strchr(auth_value, '.') != nullptr) {
+      string jwt_token = string(auth_value + 7);
+      StripWhiteSpace(&jwt_token);
+      if (!jwt_token.empty()) {
+        if (JWTTokenAuth(jwt_token, connection, request_info)) {
+          total_jwt_token_auth_success_->Increment(1);
+          authenticated = true;
+          // TODO: cookies are not added, but are not needed right now
+        } else {
+          LOG(INFO) << "Invalid JWT token provided: " << jwt_token;
+          total_jwt_token_auth_failure_->Increment(1);
+        }
+      }
+    }
+  }
+  if (!authenticated) {
+    authenticated = auth_mode_ != AuthMode::SPNEGO && auth_mode_ != AuthMode::LDAP;
+  }
+  // Try authenticating with a cookie, if enabled.
   if (!authenticated && use_cookies_) {
     const char* cookie_header = sq_get_header(connection, "Cookie");
     string username;
@@ -832,6 +871,37 @@ bool Webserver::TrustedDomainCheck(const string& origin, struct sq_connection* c
   return true;
 }
 
+bool Webserver::JWTTokenAuth(const std::string& jwt_token,
+    struct sq_connection* connection, struct sq_request_info* request_info) {
+  JWTHelper::UniqueJWTDecodedToken decoded_token;
+  Status status = JWTHelper::Decode(jwt_token, decoded_token);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error decoding JWT token in Authorization header, "
+               << "Error: " << status;
+    return false;
+  }
+  if (FLAGS_jwt_validate_signature) {
+    status = JWTHelper::GetInstance()->Verify(decoded_token.get());
+    if (!status.ok()) {
+      LOG(ERROR) << "Error verifying JWT token in Authorization header, "
+                 << "Error: " << status;
+      return false;
+    }
+  }
+
+  DCHECK(!FLAGS_jwt_custom_claim_username.empty());
+  string username;
+  status = JWTHelper::GetCustomClaimUsername(
+      decoded_token.get(), FLAGS_jwt_custom_claim_username, username);
+  if (!status.ok()) {
+    LOG(ERROR) << "Cannot retrieve username from JWT token in Authorization header, "
+               << "Error: " << status;
+    return false;
+  }
+  request_info->remote_user = strdup(username.c_str());
+  return true;
+}
+
 Status Webserver::HandleBasic(struct sq_connection* connection,
     struct sq_request_info* request_info, vector<string>* response_headers) {
   const char* authz_header = sq_get_header(connection, "Authorization");
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index 96698c1..70438b2 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -201,6 +201,11 @@ class Webserver {
   bool TrustedDomainCheck(const std::string& origin, struct sq_connection* connection,
       struct sq_request_info* request_info);
 
+  /// Checks and returns true if the JWT token in Authorization header could be verified
+  /// and the token has a valid username.
+  bool JWTTokenAuth(const std::string& jwt_token, struct sq_connection* connection,
+      struct sq_request_info* request_info);
+
   // Handle Basic authentication for this request. Returns an error if authentication was
   // unsuccessful.
   Status HandleBasic(struct sq_connection* connection,
@@ -267,6 +272,10 @@ class Webserver {
   /// auth if it originates from a trusted domain.
   bool check_trusted_domain_;
 
+  /// If true, the JWT token in Authorization header will be used for authentication.
+  /// An incoming connection will be accepted if the JWT token could be verified.
+  bool use_jwt_ = false;
+
   /// Used to validate usernames/passwords If LDAP authentication is in use.
   std::unique_ptr<ImpalaLdap> ldap_;
 
@@ -288,6 +297,11 @@ class Webserver {
   /// If 'use_cookies_' is true, metrics for the number of successful
   /// attempts to authorize connections originating from a trusted domain.
   IntCounter* total_trusted_domain_check_success_ = nullptr;
+
+  /// If 'use_jwt_' is true, metrics for the number of successful and failed JWT auth
+  /// attempts.
+  IntCounter* total_jwt_token_auth_success_ = nullptr;
+  IntCounter* total_jwt_token_auth_failure_ = nullptr;
 };
 
 }
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 580d203..0a53b02 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -438,7 +438,7 @@ def get_toolchain_downloads():
   toolchain_packages += [llvm_package, llvm_package_asserts, gcc_package]
   toolchain_packages += map(ToolchainPackage,
       ["avro", "binutils", "boost", "breakpad", "bzip2", "cctz", "cmake", "crcutil",
-       "flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "libev",
+       "flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "jwt-cpp", "libev",
        "libunwind", "lz4", "openldap", "openssl", "orc", "protobuf", "python",
        "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib", "zstd"])
   # Check whether this platform is supported (or whether a valid custom toolchain
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index d461582..08359a1 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -103,6 +103,8 @@ export IMPALA_GPERFTOOLS_VERSION=2.5-p4
 unset IMPALA_GPERFTOOLS_URL
 export IMPALA_GTEST_VERSION=1.6.0
 unset IMPALA_GTEST_URL
+export IMPALA_JWT_CPP_VERSION=0.5.0
+unset IMPALA_JWT_CPP_URL
 export IMPALA_LIBEV_VERSION=4.20
 unset IMPALA_LIBEV_URL
 export IMPALA_LIBUNWIND_VERSION=1.3-rc1-p3
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 366012e..83b5ad9 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -167,6 +167,7 @@ testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_default.expected.txt
 testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_extended.expected.txt
 testdata/hive_benchmark/grepTiny/part-00000
+testdata/jwt/jwks_rs256.json
 testdata/tzdb/2017c.zip
 testdata/tzdb/2017c-corrupt.zip
 testdata/tzdb_tiny/*
diff --git a/cmake_modules/FindJwtCpp.cmake b/cmake_modules/FindJwtCpp.cmake
new file mode 100644
index 0000000..c475edf
--- /dev/null
+++ b/cmake_modules/FindJwtCpp.cmake
@@ -0,0 +1,38 @@
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+# - Find jwt-cpp headers.
+# JWT_CPP_ROOT hints the location
+# This module defines JWT_CPP_INCLUDE_DIR, the directory containing headers
+
+set(JWT_CPP_SEARCH_HEADER_PATHS ${JWT_CPP_ROOT}/include)
+
+find_path(JWT_CPP_INCLUDE_DIR jwt-cpp/jwt.h HINTS
+  ${JWT_CPP_SEARCH_HEADER_PATHS})
+
+if (NOT JWT_CPP_INCLUDE_DIR)
+  message(FATAL_ERROR "jwt-cpp headers NOT found.")
+  set(JWT_CPP_FOUND FALSE)
+else()
+  set(JWT_CPP_FOUND TRUE)
+endif ()
+
+mark_as_advanced(
+  JWT_CPP_INCLUDE_DIR
+)
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 4c90674..842262d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -471,6 +471,10 @@ error_codes = (
 
   ("LOCAL_DISK_FAULTY", 152,
    "Query execution failure caused by local disk IO fatal error on backend: $0."),
+
+  ("JWKS_PARSE_ERROR", 153, "Error parsing JWKS: $0."),
+
+  ("JWT_VERIFY_FAILED", 154, "Error verifying JWT Token: $0."),
 )
 
 import sys
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b962145..c5f600e 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1392,6 +1392,26 @@
     "key": "impala.thrift-server.hiveserver2-http-frontend.total-saml-auth-failure"
   },
   {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that were successfully authenticated using JWT Token.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection JWT Token Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-jwt-token-auth-success"
+  },
+  {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that were attempted to authenticate using JWT Token but were unsuccessful.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection JWT Token Failure",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-jwt-token-auth-failure"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"
@@ -2823,7 +2843,7 @@
     "kind": "COUNTER",
     "key": "impala.webserver.total-cookie-auth-failure"
   },
-    {
+  {
     "description": "The number of HTTP connection requests to this daemon's webserver that originated from a trusted domain.",
     "contexts": [
       "IMPALAD",
@@ -2836,6 +2856,30 @@
     "key": "impala.webserver.total-trusted-domain-check-success"
   },
   {
+    "description": "The number of HTTP connection requests to this daemon's webserver that were successfully authenticated using a JWT token.",
+    "contexts": [
+      "IMPALAD",
+      "CATALOGSERVER",
+      "STATESTORE"
+    ],
+    "label": "Webserver HTTP Connection JWT Token Auth Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.webserver.total-jwt-token-auth-success"
+  },
+  {
+    "description": "The number of HTTP connection requests to this daemon's webserver that provided an invalid JWT token.",
+    "contexts": [
+      "IMPALAD",
+      "CATALOGSERVER",
+      "STATESTORE"
+    ],
+    "label": "Webserver HTTP Connection JWT Token Auth Failure",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.webserver.total-jwt-token-auth-failure"
+  },
+  {
     "description": "The number of times the FAIL debug action returned an error. For testing only.",
     "contexts": [
       "IMPALAD"
diff --git a/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java b/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java
new file mode 100644
index 0000000..931abba
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/JwtHttpTest.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hive.service.rpc.thrift.*;
+import org.apache.impala.util.Metrics;
+import org.apache.thrift.transport.THttpClient;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that hiveserver2 operations over the http interface work as expected when
+ * JWT authentication is being used.
+ */
+public class JwtHttpTest {
+  Metrics metrics = new Metrics();
+
+  public void setUp(String extraArgs) throws Exception {
+    int ret = CustomClusterRunner.StartImpalaCluster(extraArgs);
+    assertEquals(ret, 0);
+  }
+
+  static void verifySuccess(TStatus status) throws Exception {
+    if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
+        || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
+      return;
+    }
+    throw new Exception(status.toString());
+  }
+
+  /**
+   * Executes 'query' and fetches the results. Expects there to be exactly one string
+   * returned, which be be equal to 'expectedResult'.
+   */
+  static TOperationHandle execAndFetch(TCLIService.Iface client,
+      TSessionHandle sessionHandle, String query, String expectedResult)
+      throws Exception {
+    TExecuteStatementReq execReq = new TExecuteStatementReq(sessionHandle, query);
+    TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
+    verifySuccess(execResp.getStatus());
+
+    TFetchResultsReq fetchReq = new TFetchResultsReq(
+        execResp.getOperationHandle(), TFetchOrientation.FETCH_NEXT, 1000);
+    TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
+    verifySuccess(fetchResp.getStatus());
+    List<TColumn> columns = fetchResp.getResults().getColumns();
+    assertEquals(1, columns.size());
+    assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
+
+    return execResp.getOperationHandle();
+  }
+
+  private void verifyJwtAuthMetrics(long expectedAuthSuccess, long expectedAuthFailure)
+      throws Exception {
+    long actualAuthSuccess =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-success");
+    assertEquals(expectedAuthSuccess, actualAuthSuccess);
+    long actualAuthFailure =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-failure");
+    assertEquals(expectedAuthFailure, actualAuthFailure);
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   * Since we don't have Java version of JWT library, we use pre-calculated JWT token
+   * and JWKS. The token and JWK set used in this test case were generated by using
+   * BE unit-test function JwtUtilTest::VerifyJwtRS256.
+   */
+  @Test
+  public void testJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+        "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+            + "--jwt_allow_without_tls=true",
+        jwksFilename));
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token and use it as
+    // login user.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyJwtAuthMetrics(3, 0);
+
+    // case 2: Authenticate fails with invalid JWT token which does not have signature.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    headers.put("Authorization", "Bearer " + invalidJwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // case 3: Authenticate fails without "Bearer" token.
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      // JWT authentication is not invoked.
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // case 4: Authenticate fails without "Authorization" header.
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      // JWT authentication is not invoked.
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testJwtAuthNotVerifySig() throws Exception {
+    // Start Impala without jwt_validate_signature as false so that the signature of
+    // JWT token will not be validated.
+    setUp("--jwt_token_auth=true --jwt_validate_signature=false "
+        + "--jwt_allow_without_tls=true");
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    // The Token was generated by BE unit-test function JwtUtilTest::VerifyJwtRS256.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyJwtAuthMetrics(3, 0);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java
new file mode 100644
index 0000000..5a798b4
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/customcluster/JwtWebserverTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.customcluster;
+
+import static org.apache.impala.testutil.LdapUtil.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Range;
+import org.apache.impala.util.Metrics;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests that Web Server works as expected when JWT authentication is being used.
+ */
+public class JwtWebserverTest {
+  private static final Range<Long> zero = Range.closed(0L, 0L);
+
+  Metrics metrics_ = new Metrics(TEST_USER_1, TEST_PASSWORD_1);
+
+  public void setUp(String extraArgs, String startArgs) throws Exception {
+    Map<String, String> env = new HashMap<>();
+    env.put("IMPALA_WEBSERVER_USERNAME", TEST_USER_1);
+    env.put("IMPALA_WEBSERVER_PASSWORD", TEST_PASSWORD_1);
+    int ret = CustomClusterRunner.StartImpalaCluster(extraArgs, env, startArgs);
+    assertEquals(ret, 0);
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    metrics_.Close();
+  }
+
+  private void verifyJwtAuthMetrics(
+      Range<Long> expectedAuthSuccess, Range<Long> expectedAuthFailure) throws Exception {
+    long actualAuthSuccess =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-success");
+    assertTrue("Expected: " + expectedAuthSuccess + ", Actual: " + actualAuthSuccess,
+        expectedAuthSuccess.contains(actualAuthSuccess));
+    long actualAuthFailure =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-failure");
+    assertTrue("Expected: " + expectedAuthFailure + ", Actual: " + actualAuthFailure,
+        expectedAuthFailure.contains(actualAuthFailure));
+  }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the Web Server.
+   * Since we don't have Java version of JWT library, we use pre-calculated JWT token
+   * and JWKS. The token and JWK set used in this test case were generated by using
+   * BE unit-test function JwtUtilTest::VerifyJwtRS256.
+   */
+  @Test
+  public void testWebserverJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+              "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+                  + "--jwt_allow_without_tls=true",
+              jwksFilename),
+        "");
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    attemptConnection("Bearer " + jwtToken, "127.0.0.1");
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), zero);
+
+    // Case 2: Failed with invalid JWT Token.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    try {
+      attemptConnection("Bearer " + invalidJwtToken, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+
+    // Case 3: Failed without "Bearer" token.
+    try {
+      attemptConnection("Basic VGVzdDFMZGFwOjEyMzQ1", "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    // JWT authentication is not invoked.
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+
+    // Case 4: Failed without "Authorization" header.
+    try {
+      attemptConnection(null, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    // JWT authentication is not invoked.
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+  }
+
+  // Helper method to make a "get" call to the Web Server using the input JWT auth token
+  // and x-forward-for address.
+  private void attemptConnection(String auth_token, String xff_address) throws Exception {
+    String url = "http://localhost:25000/?json";
+    URLConnection connection = new URL(url).openConnection();
+    if (auth_token != null) connection.setRequestProperty("Authorization", auth_token);
+    if (xff_address != null) {
+      connection.setRequestProperty("X-Forwarded-For", xff_address);
+    }
+    connection.getInputStream();
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 91950aa..8757b1e 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -123,6 +124,18 @@ public class LdapHS2Test {
     assertEquals(expectedAuthSuccess, actualAuthSuccess);
   }
 
+  private void verifyJwtAuthMetrics(long expectedAuthSuccess, long expectedAuthFailure)
+      throws Exception {
+    long actualAuthSuccess =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-success");
+    assertEquals(expectedAuthSuccess, actualAuthSuccess);
+    long actualAuthFailure =
+        (long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-jwt-token-auth-failure");
+    assertEquals(expectedAuthFailure, actualAuthFailure);
+  }
+
   /**
    * Tests LDAP authentication to the HTTP hiveserver2 endpoint.
    */
@@ -404,4 +417,66 @@ public class LdapHS2Test {
     verifyMetrics(4, 1);
     verifyTrustedDomainMetrics(6);
   }
+
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the HTTP hiveserver2 endpoint.
+   */
+  @Test
+  public void testHiveserver2JwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+        "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+            + "--jwt_allow_without_tls=true",
+        jwksFilename));
+    verifyMetrics(0, 0);
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    headers.put("Authorization", "Bearer " + jwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'impala' from JWT token and use it as
+    // login user.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyMetrics(0, 0);
+    verifyJwtAuthMetrics(1, 0);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyMetrics(0, 0);
+    verifyJwtAuthMetrics(3, 0);
+
+    // case 2: Authenticate fails with invalid JWT token which does not have signature.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    headers.put("Authorization", "Bearer " + invalidJwtToken);
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyJwtAuthMetrics(3, 1);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
index dc6fd89..6e68db6 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
@@ -105,6 +106,18 @@ public class LdapWebserverTest {
         expectedSuccess.contains(actualSuccess));
   }
 
+  private void verifyJwtAuthMetrics(
+      Range<Long> expectedAuthSuccess, Range<Long> expectedAuthFailure) throws Exception {
+    long actualAuthSuccess =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-success");
+    assertTrue("Expected: " + expectedAuthSuccess + ", Actual: " + actualAuthSuccess,
+        expectedAuthSuccess.contains(actualAuthSuccess));
+    long actualAuthFailure =
+        (long) metrics_.getMetric("impala.webserver.total-jwt-token-auth-failure");
+    assertTrue("Expected: " + expectedAuthFailure + ", Actual: " + actualAuthFailure,
+        expectedAuthFailure.contains(actualAuthFailure));
+  }
+
   @Test
   public void testWebserver() throws Exception {
     setUp("", "");
@@ -245,6 +258,45 @@ public class LdapWebserverTest {
     verifyTrustedDomainMetrics(Range.closed(successMetricBefore, successMetricBefore));
   }
 
+  /**
+   * Tests if sessions are authenticated by verifying the JWT token for connections
+   * to the Web Server.
+   */
+  @Test
+  public void testWebserverJwtAuth() throws Exception {
+    String jwksFilename =
+        new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
+    setUp(String.format(
+              "--jwt_token_auth=true --jwt_validate_signature=true --jwks_file_path=%s "
+                  + "--jwt_allow_without_tls=true",
+              jwksFilename),
+        "");
+
+    // Case 1: Authenticate with valid JWT Token in HTTP header.
+    String jwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
+        + "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
+        + "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
+        + "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
+        + "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
+    attemptConnection("Bearer " + jwtToken, "127.0.0.1");
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), zero);
+
+    // Case 2: Failed with invalid JWT Token.
+    String invalidJwtToken =
+        "eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
+        + "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
+        + "W1wYWxhIn0.";
+    try {
+      attemptConnection("Bearer " + invalidJwtToken, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
+  }
+
   // Helper method to make a get call to the webserver using the input basic
   // auth token and x-forward-for token.
   private void attemptConnection(String basic_auth_token, String xff_address)
diff --git a/testdata/jwt/jwks_rs256.json b/testdata/jwt/jwks_rs256.json
new file mode 100644
index 0000000..95483ba
--- /dev/null
+++ b/testdata/jwt/jwks_rs256.json
@@ -0,0 +1,7 @@
+
+{
+  "keys": [
+    { "use": "sig", "kty": "RSA", "kid": "public:c424b67b-fe28-45d7-b015-f79da50b5b21", "alg": "RS256", "n": "uGbXWiK3dQTyCbX5xdE4yCuYp0AF2d15Qq1JSXT_lx8CEcXb9RbDddl8jGDv-spi5qPa8qEHiK7FwV2KpRE983wGPnYsAm9BxLFb4YrLYcDFOIGULuk2FtrPS512Qea1bXASuvYXEpQNpGbnTGVsWXI9C-yjHztqyL2h8P6mlThPY9E9ue2fCqdgixfTFIF9Dm4SLHbphUS2iw7w1JgT69s7of9-I9l5lsJ9cozf1rxrXX4V1u_SotUuNB3Fp8oB4C1fLBEhSlMcUJirz1E8AziMCxS-VrRPDM-zfvpIJg3JljAh3PJHDiLu902v9w-Iplu1WyoB2aPfitxEhRN0Yw", "e": "AQAB" },
+    { "use": "sig", "kty": "RSA", "kid": "public:9b9d0b47-b9ed-4ba6-9180-52fc5b161a3a", "alg": "RS256", "n": "xzYuc22QSst_dS7geYYK5l5kLxU0tayNdixkEQ17ix-CUcUbKIsnyftZxaCYT46rQtXgCaYRdJcbB3hmyrOavkhTpX79xJZnQmfuamMbZBqitvscxW9zRR9tBUL6vdi_0rpoUwPMEh8-Bw7CgYR0FK0DhWYBNDfe9HKcyZEv3max8Cdq18htxjEsdYO0iwzhtKRXomBWTdhD5ykd_fACVTr4-KEY-IeLvubHVmLUhbE5NgWXxrRpGasDqzKhCTmsa2Ysf712rl57SlH0Wz_Mr3F7aM9YpErzeYLrl0GhQr9BVJxOvXcVd4kmY-XkiCcrkyS1cnghnllh-LCwQu1sYw", "e": "AQAB" }
+  ]
+}