You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2022/04/20 13:33:45 UTC

[hive] branch master updated: HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)

This is an automated email from the ASF dual-hosted git repository.

ychena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b7da71856b HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)
b7da71856b is described below

commit b7da71856b1bb51af68a5ba6890b65f9843f3606
Author: Sourabh Goyal <so...@cloudera.com>
AuthorDate: Wed Apr 20 06:33:37 2022 -0700

    HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)
    
    * [WIP]HIVE-21456: Thrift over Http for Hive Metastore
    
    Change-Id: Ie610b7351fe6279353c1f781b0602da0f1860443
    
    * Addresses review comments. Also fixes build failure
    
    Change-Id: Idc8dc3448156e7e2715dc9ea979edf007d4d53d4
    
    * fixes test failures
    
    Change-Id: Ibf6210985248f88cf7011b048703e95fd99dee49
    
    * Refactors creation of Binary and HTTP clients in seprate methods in HiveMetastoreClient
    
    Change-Id: Ib080e24eede76104e10458343f85ac746022f16d
    
    * Addresses review comments
    
    Change-Id: I5ec4fb201bd65bc358c38160348b200fc16d730c
    
    * Fixes validation of maxIdleTimeout in metastore http server
    
    Change-Id: I52990b3904cd8d42da9cff9f282e1f099323e3d7
    
    * Disabled HTTP TRACE in embedded jetty server in HMS
    
    Change-Id: Idcdec4ee0ff7d3ded67816cca4505627a1e5b33b
    
    * Addresses review comments
    
    Change-Id: Ie046c512f2095b0d71743a9485620e369dc75b17
    
    * Addresses nits. Adds some more comments
    
    Change-Id: I39b50cd549af62e5d460fa99167c5aab221edaf8
---
 .../java/org/hadoop/hive/jdbc/SSLTestUtils.java    |   6 +
 .../test/java/org/apache/hive/jdbc/TestSSL.java    |  24 +-
 .../hadoop/hive/metastore/HiveMetaStoreClient.java | 352 +++++++++++++--------
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  27 ++
 .../hive/metastore/utils/MetaStoreUtils.java       |  25 ++
 .../hadoop/hive/metastore/utils/SecurityUtils.java |  46 +++
 .../src/test/resources/log4j2.properties           |  11 +-
 .../hadoop/hive/metastore/HiveMetaStore.java       | 285 +++++++++++++++--
 .../hive/metastore/HmsThriftHttpServlet.java       | 113 +++++++
 ...Store.java => TestRemoteHiveHttpMetaStore.java} |  49 +--
 .../hive/metastore/TestRemoteHiveMetaStore.java    |   3 +
 standalone-metastore/pom.xml                       |   6 +
 12 files changed, 752 insertions(+), 195 deletions(-)

diff --git a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
index b8e7e3de65..3917a3b457 100644
--- a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
+++ b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
@@ -67,6 +67,12 @@ public class SSLTestUtils {
             KEY_STORE_TRUST_STORE_PASSWORD);
   }
 
+  public static void setMetastoreHttpsConf(HiveConf conf) {
+    setMetastoreSslConf(conf);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.THRIFT_TRANSPORT_MODE, "http");
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http");
+  }
+
   public static void clearSslConfOverlay(Map<String, String> confOverlay) {
     confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, "false");
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
index 1d170ec309..ec6c65f75a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-@org.junit.Ignore("HIVE-22620")
 public class TestSSL {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestSSL.class);
@@ -65,6 +64,7 @@ public class TestSSL {
   private static final String JAVA_TRUST_STORE_PROP = "javax.net.ssl.trustStore";
   private static final String JAVA_TRUST_STORE_PASS_PROP = "javax.net.ssl.trustStorePassword";
   private static final String JAVA_TRUST_STORE_TYPE_PROP = "javax.net.ssl.trustStoreType";
+  private static final String KEY_MANAGER_FACTORY_ALGORITHM = "SunX509";
 
   private MiniHS2 miniHS2 = null;
   private static HiveConf conf = new HiveConf();
@@ -290,6 +290,7 @@ public class TestSSL {
    * Test SSL client connection to SSL server
    * @throws Exception
    */
+  @Ignore
   @Test
   public void testSSLConnectionWithProperty() throws Exception {
     SSLTestUtils.setSslConfOverlay(confOverlay);
@@ -390,6 +391,7 @@ public class TestSSL {
    * Opening a new connection with this wrong certificate should fail
    * @throws Exception
    */
+  @Ignore
   @Test
   public void testConnectionWrongCertCN() throws Exception {
     // This call sets the default ssl params including the correct keystore in the server config
@@ -437,15 +439,34 @@ public class TestSSL {
    * Test HMS server with SSL
    * @throws Exception
    */
+  @Ignore
   @Test
   public void testMetastoreWithSSL() throws Exception {
     testSSLHMS(true);
   }
 
+  /**
+   * Test HMS server with Thrift over Http + SSL
+   * @throws Exception
+   */
+  @Test
+  public void testMetastoreWithHttps() throws Exception {
+    SSLTestUtils.setMetastoreHttpsConf(conf);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM,
+        KEY_MANAGER_FACTORY_ALGORITHM);
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_TYPE, KEY_STORE_TRUST_STORE_TYPE);
+    // false flag in testSSLHMS will set key store type for metastore
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM,
+        KEY_MANAGER_FACTORY_ALGORITHM);
+
+    testSSLHMS(false);
+  }
+
   /**
    * Test HMS server with SSL with input keystore type
    * @throws Exception
    */
+  @Ignore
   @Test
   public void testMetastoreWithSSLKeyStoreType() throws Exception {
     testSSLHMS(false);
@@ -511,6 +532,7 @@ public class TestSSL {
    * Test SSL client connection to SSL server
    * @throws Exception
    */
+  @Ignore
   @Test
   public void testSSLConnectionWithKeystoreType() throws Exception {
     SSLTestUtils.setSslConfOverlay(confOverlay);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 81f6083ce4..f1d06d1d34 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -78,12 +78,18 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.protocol.HttpContext;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.THttpClient;
 import org.apache.thrift.transport.layered.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
@@ -594,6 +600,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     client.rename_partition_req(req);
   }
 
+  private THttpClient createHttpClient(URI store, boolean useSSL) throws MetaException,
+      TTransportException {
+    String path = MetaStoreUtils.getHttpPath(MetastoreConf.getVar(conf, ConfVars.THRIFT_HTTP_PATH));
+    String httpUrl = (useSSL ? "https://" : "http://") + store.getHost() + ":" + store.getPort() + path;
+
+    String user = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
+    if (user == null || user.equals("")) {
+      try {
+        LOG.debug("No username passed in config " + ConfVars.METASTORE_CLIENT_PLAIN_USERNAME.getHiveName() +
+            ". Trying to get the current user from UGI" );
+        user = UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        throw new MetaException("Failed to get client username from UGI");
+      }
+    }
+    final String httpUser = user;
+    THttpClient tHttpClient;
+    HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+    httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {
+      @Override
+      public void process(HttpRequest httpRequest, HttpContext httpContext)
+          throws HttpException, IOException {
+        httpRequest.addHeader(MetaStoreUtils.USER_NAME_HTTP_HEADER, httpUser);
+      }
+    });
+
+    try {
+      if (useSSL) {
+        String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
+        if (trustStorePath.isEmpty()) {
+          throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
+              + " Not configured for SSL connection");
+        }
+        String trustStorePassword =
+            MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
+        String trustStoreType =
+            MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
+        String trustStoreAlgorithm =
+            MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
+        tHttpClient = SecurityUtils.getThriftHttpsClient(httpUrl, trustStorePath, trustStorePassword,
+            trustStoreAlgorithm, trustStoreType, httpClientBuilder);
+      }  else {
+        tHttpClient = new THttpClient(httpUrl, httpClientBuilder.build());
+      }
+    } catch (Exception e) {
+      if (e instanceof TTransportException) {
+        throw (TTransportException)e;
+      } else {
+        throw new MetaException("Failed to create http transport client to url: " + httpUrl
+            + ". Error:" + e);
+      }
+    }
+    LOG.debug("Created thrift http client for URL: " + httpUrl);
+    return tHttpClient;
+  }
+
+  private TTransport createBinaryClient(URI store, boolean useSSL) throws TTransportException,
+      MetaException {
+    TTransport binaryTransport = null;
+    try {
+      int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
+          ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
+      if (useSSL) {
+        String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
+        if (trustStorePath.isEmpty()) {
+          throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
+              + " Not configured for SSL connection");
+        }
+        String trustStorePassword =
+            MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
+        String trustStoreType =
+            MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
+        String trustStoreAlgorithm =
+            MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
+        binaryTransport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
+            trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
+      } else {
+        binaryTransport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(),
+            clientSocketTimeout);
+      }
+      binaryTransport = createAuthBinaryTransport(store, binaryTransport);
+    } catch (Exception e) {
+      if (e instanceof TTransportException) {
+        throw (TTransportException)e;
+      } else {
+        throw new MetaException("Failed to create binary transport client to url: " + store
+            + ". Error: " + e);
+      }
+    }
+    LOG.debug("Created thrift binary client for URI: " + store);
+    return binaryTransport;
+  }
+
   private void open() throws MetaException {
     isConnected = false;
     TTransportException tte = null;
@@ -602,10 +701,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL);
     String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE);
     boolean usePasswordAuth = false;
-    boolean useFramedTransport = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT);
     boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
-    int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
-        ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
+    String transportMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE);
+    boolean isHttpTransportMode = transportMode.equalsIgnoreCase("http");
 
     if (clientAuthMode != null) {
       usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
@@ -613,118 +711,18 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
     for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
       for (URI store : metastoreUris) {
-        LOG.info("Trying to connect to metastore with URI ({})", store);
-
+        LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store,
+            transportMode);
         try {
-          if (useSSL) {
-            try {
-              String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
-              if (trustStorePath.isEmpty()) {
-                throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
-                    + " Not configured for SSL connection");
-              }
-              String trustStorePassword =
-                  MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
-              String trustStoreType =
-                      MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
-              String trustStoreAlgorithm =
-                      MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
-
-              // Create an SSL socket and connect
-              transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
-                  trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
-              final int newCount = connCount.incrementAndGet();
-              LOG.debug(
-                  "Opened an SSL connection to metastore, current connections: {}",
-                  newCount);
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
-                    System.identityHashCode(this), new Exception());
-              }
-            } catch (IOException e) {
-              throw new IllegalArgumentException(e);
-            } catch (TTransportException e) {
-              tte = e;
-              throw new MetaException(e.toString());
-            }
-          } else {
-            try {
-              transport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), clientSocketTimeout);
-            } catch (TTransportException e) {
-              tte = e;
-              throw new MetaException(e.toString());
-            }
-          }
-
-          if (usePasswordAuth) {
-            // we are using PLAIN Sasl connection with user/password
-            LOG.debug("HMSC::open(): Creating plain authentication thrift connection.");
-            String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
-
-            if (null == userName || userName.isEmpty()) {
-              throw new MetaException("No user specified for plain transport.");
-            }
-
-            // The password is not directly provided. It should be obtained from a keystore pointed
-            // by configuration "hadoop.security.credential.provider.path".
-            try {
-              String passwd = null;
-              char[] pwdCharArray = conf.getPassword(userName);
-              if (null != pwdCharArray) {
-                passwd = new String(pwdCharArray);
-              }
-              if (null == passwd) {
-                throw new MetaException("No password found for user " + userName);
-              }
-              // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
-              transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, transport);
-            } catch (IOException | TTransportException sasle) {
-              // IOException covers SaslException
-              LOG.error("Could not create client transport", sasle);
-              throw new MetaException(sasle.toString());
-            }
-          } else if (useSasl) {
-            // Wrap thrift connection with SASL for secure connection.
-            try {
-              HadoopThriftAuthBridge.Client authBridge =
-                HadoopThriftAuthBridge.getBridge().createClient();
-
-              // check if we should use delegation tokens to authenticate
-              // the call below gets hold of the tokens if they are set up by hadoop
-              // this should happen on the map/reduce tasks if the client added the
-              // tokens into hadoop's credential store in the front end during job
-              // submission.
-              String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
-              // tokenSig could be null
-              tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
-
-              if (tokenStrForm != null) {
-                LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection.");
-                // authenticate using delegation tokens via the "DIGEST" mechanism
-                transport = authBridge.createClientTransport(null, store.getHost(),
-                    "DIGEST", tokenStrForm, transport,
-                        MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
-              } else {
-                LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.");
-                String principalConfig =
-                    MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL);
-                transport = authBridge.createClientTransport(
-                    principalConfig, store.getHost(), "KERBEROS", null,
-                    transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
-              }
-            } catch (IOException ioe) {
-              LOG.error("Failed to create client transport", ioe);
-              throw new MetaException(ioe.toString());
-            }
-          } else {
-            if (useFramedTransport) {
-              try {
-                transport = new TFramedTransport(transport);
-              } catch (TTransportException e) {
-                LOG.error("Failed to create client transport", e);
-                throw new MetaException(e.toString());
-              }
+          try {
+            if (isHttpTransportMode) {
+              transport = createHttpClient(store, useSSL);
+            } else {
+              transport = createBinaryClient(store, useSSL);
             }
+          } catch (TTransportException te) {
+            tte = te;
+            throw new MetaException(te.toString());
           }
 
           final TProtocol protocol;
@@ -738,23 +736,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
             if (!transport.isOpen()) {
               transport.open();
               final int newCount = connCount.incrementAndGet();
-              LOG.info("Opened a connection to metastore, URI ({}) "
-                  + "current connections: {}", store, newCount);
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
-                    System.identityHashCode(this), new Exception());
+              if (useSSL) {
+                LOG.info(
+                    "Opened an SSL connection to metastore, current connections: {}",
+                    newCount);
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
+                      System.identityHashCode(this), new Exception());
+                }
+              } else {
+                LOG.info("Opened a connection to metastore, URI ({}) "
+                    + "current connections: {}", store, newCount);
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
+                      System.identityHashCode(this), new Exception());
+                }
               }
             }
             isConnected = true;
           } catch (TTransportException e) {
             tte = e;
-            LOG.warn("Failed to connect to the MetaStore Server URI ({})",
-                store);
-            LOG.debug("Failed to connect to the MetaStore Server URI ({})",
-                store, e);
+            String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s "
+                    + "transport mode",   store, transportMode);
+            LOG.warn(errMsg);
+            LOG.debug(errMsg, e);
           }
 
-          if (isConnected && !useSasl && !usePasswordAuth &&
+          if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode &&
                   MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
             // Call set_ugi, only in unsecure mode.
             try {
@@ -773,8 +781,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
           }
         } catch (MetaException e) {
           recentME = e;
-          LOG.error("Failed to connect to metastore with URI (" + store
-              + ") in attempt " + attempt, e);
+          String errMsg = "Failed to connect to metastore with URI (" + store
+              + ") transport mode:" + transportMode + " in attempt " + attempt;
+          LOG.error(errMsg, e);
         }
         if (isConnected) {
           break;
@@ -806,6 +815,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     snapshotActiveConf();
   }
 
+  // wraps the underlyingTransport in the appropriate transport based on mode of authentication
+  private TTransport createAuthBinaryTransport(URI store, TTransport underlyingTransport)
+      throws MetaException {
+    boolean isHttpTransportMode =
+        MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE).
+        equalsIgnoreCase("http");
+    Preconditions.checkArgument(!isHttpTransportMode);
+    Preconditions.checkNotNull(underlyingTransport, "Underlying transport should not be null");
+    // default transport is the underlying one
+    TTransport transport = underlyingTransport;
+    boolean useFramedTransport =
+        MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT);
+    boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
+    boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL);
+    String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE);
+    boolean usePasswordAuth = false;
+
+    if (clientAuthMode != null) {
+      usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
+    }
+    if (usePasswordAuth) {
+      // we are using PLAIN Sasl connection with user/password
+      LOG.debug("HMSC::open(): Creating plain authentication thrift connection.");
+      String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
+
+      if (null == userName || userName.isEmpty()) {
+        throw new MetaException("No user specified for plain transport.");
+      }
+
+      // The password is not directly provided. It should be obtained from a keystore pointed
+      // by configuration "hadoop.security.credential.provider.path".
+      try {
+        String passwd = null;
+        char[] pwdCharArray = conf.getPassword(userName);
+        if (null != pwdCharArray) {
+          passwd = new String(pwdCharArray);
+        }
+        if (null == passwd) {
+          throw new MetaException("No password found for user " + userName);
+        }
+        // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
+        transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, underlyingTransport);
+      } catch (IOException | TTransportException sasle) {
+        // IOException covers SaslException
+        LOG.error("Could not create client transport", sasle);
+        throw new MetaException(sasle.toString());
+      }
+    } else if (useSasl) {
+      // Wrap thrift connection with SASL for secure connection.
+      try {
+        HadoopThriftAuthBridge.Client authBridge =
+            HadoopThriftAuthBridge.getBridge().createClient();
+
+        // check if we should use delegation tokens to authenticate
+        // the call below gets hold of the tokens if they are set up by hadoop
+        // this should happen on the map/reduce tasks if the client added the
+        // tokens into hadoop's credential store in the front end during job
+        // submission.
+        String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
+        // tokenSig could be null
+        tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
+
+        if (tokenStrForm != null) {
+          LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection.");
+          // authenticate using delegation tokens via the "DIGEST" mechanism
+          transport = authBridge.createClientTransport(null, store.getHost(),
+              "DIGEST", tokenStrForm, underlyingTransport,
+              MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+        } else {
+          LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.");
+          String principalConfig =
+              MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL);
+          transport = authBridge.createClientTransport(
+              principalConfig, store.getHost(), "KERBEROS", null,
+              underlyingTransport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+        }
+      } catch (IOException ioe) {
+        LOG.error("Failed to create client transport", ioe);
+        throw new MetaException(ioe.toString());
+      }
+    } else {
+      if (useFramedTransport) {
+        try {
+          transport = new TFramedTransport(transport);
+        } catch (TTransportException e) {
+          LOG.error("Failed to create client transport", e);
+          throw new MetaException(e.toString());
+        }
+      }
+    }
+    return transport;
+  }
+
   private void snapshotActiveConf() {
     currentMetaVars = new HashMap<>(MetastoreConf.metaVars.length);
     for (ConfVars oneVar : MetastoreConf.metaVars) {
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9cc8d04ceb..51a46e8d42 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1364,6 +1364,13 @@ public class MetastoreConf {
         "Comma-separated list of tasks that will be started in separate threads.  These will be" +
             " started only when the metastore is running as a separate service.  They must " +
             "implement " + METASTORE_TASK_THREAD_CLASS),
+    THRIFT_TRANSPORT_MODE("metastore.server.thrift.transport.mode",
+        "hive.metastore.server.thrift.transport.mode", "binary",
+        "Transport mode for thrift server in Metastore. Can be binary or http"),
+    THRIFT_HTTP_PATH("metastore.server.thrift.http.path",
+        "hive.metastore.server.thrift.http.path",
+        "metastore",
+        "Path component of URL endpoint when in HTTP mode"),
     TCP_KEEP_ALIVE("metastore.server.tcp.keepalive",
         "hive.metastore.server.tcp.keepalive", true,
         "Whether to enable TCP keepalive for the metastore server. Keepalive will prevent accumulation of half-open connections."),
@@ -1504,6 +1511,26 @@ public class MetastoreConf {
     USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false,
         "Comma separated list of users who are in admin role for bootstrapping.\n" +
             "More users can be added in ADMIN role later."),
+    // TODO: Should we have a separate config for the metastoreclient or THRIFT_TRANSPORT_MODE
+    // would suffice ?
+    METASTORE_CLIENT_THRIFT_TRANSPORT_MODE("metastore.client.thrift.transport.mode",
+        "hive.metastore.client.thrift.transport.mode", "binary",
+        "Transport mode to be used by the metastore client. It should be the same as " + THRIFT_TRANSPORT_MODE),
+    METASTORE_CLIENT_THRIFT_HTTP_PATH("metastore.client.thrift.http.path",
+        "hive.metastore.client.thrift.http.path",
+        "metastore",
+        "Path component of URL endpoint when in HTTP mode"),
+    METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE("metastore.server.thrift.http.request.header.size",
+        "hive.metastore.server.thrift.http.request.header.size", 6*1024,
+        "Request header size in bytes when using HTTP transport mode for metastore thrift server."
+            + " Defaults to jetty's defaults"),
+    METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE("metastore.server.thrift.http.response.header.size",
+        "metastore.server.thrift.http.response.header.size", 6*1024,
+        "Response header size in bytes when using HTTP transport mode for metastore thrift server."
+            + " Defaults to jetty's defaults"),
+    METASTORE_THRIFT_HTTP_MAX_IDLE_TIME("metastore.thrift.http.max.idle.time", "hive.metastore.thrift.http.max.idle.time", 
+        1800, TimeUnit.SECONDS,
+        "Maximum idle time for a connection on the server when in HTTP mode."),
     USE_SSL("metastore.use.SSL", "hive.metastore.use.SSL", false,
         "Set this to true for using SSL encryption in HMS server."),
     // We should somehow unify next two options.
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 8854430f7a..d4bcb5b5e9 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -128,6 +128,8 @@ public class MetaStoreUtils {
 
   public static final String NO_VAL = " --- ";
 
+  public static final String USER_NAME_HTTP_HEADER = "x-actor-username";
+
   /**
    * Catches exceptions that cannot be handled and wraps them in MetaException.
    *
@@ -1157,4 +1159,27 @@ public class MetaStoreUtils {
     }
     return result;
   }
+
+  /**
+   * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on.
+   * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+   * @param httpPath
+   * @return
+   */
+  public static String getHttpPath(String httpPath) {
+    if (httpPath == null || httpPath.equals("")) {
+      httpPath = "/*";
+    } else {
+      if (!httpPath.startsWith("/")) {
+        httpPath = "/" + httpPath;
+      }
+      if (httpPath.endsWith("/")) {
+        httpPath = httpPath + "*";
+      }
+      if (!httpPath.endsWith("/*")) {
+        httpPath = httpPath + "/*";
+      }
+    }
+    return httpPath;
+  }
 }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
index 2b326b2922..cb5b170808 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.hive.metastore.utils;
 
+import com.google.common.base.Preconditions;
+import java.io.FileInputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import javax.net.ssl.SSLContext;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
 import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector;
@@ -27,12 +34,21 @@ import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.thrift.transport.THttpClient;
 import org.apache.thrift.transport.TSSLTransportFactory;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,6 +287,36 @@ public class SecurityUtils {
     return getSSLSocketWithHttps(tSSLSocket);
   }
 
+  /*
+  Sets the ssl related configs in the underlying http client builder and wrap it up
+  in a THttpClient
+   */
+  public static THttpClient getThriftHttpsClient(String httpsUrl, String trustStorePath,
+      String trustStorePasswd, String trustStoreAlgorithm, String trustStoreType,
+      HttpClientBuilder underlyingHttpClientBuilder) throws TTransportException, IOException,
+      KeyStoreException, NoSuchAlgorithmException, CertificateException,
+      KeyManagementException {
+    Preconditions.checkNotNull(underlyingHttpClientBuilder, "httpClientBuilder should not be null");
+    if (trustStoreType == null || trustStoreType.isEmpty()) {
+      trustStoreType = KeyStore.getDefaultType();
+    }
+    KeyStore sslTrustStore = KeyStore.getInstance(trustStoreType);
+    try (FileInputStream fis = new FileInputStream(trustStorePath)) {
+      sslTrustStore.load(fis, trustStorePasswd.toCharArray());
+    }
+
+    SSLContext sslContext =
+        SSLContexts.custom().setTrustManagerFactoryAlgorithm(trustStoreAlgorithm).
+            loadTrustMaterial(sslTrustStore, null).build();
+    SSLConnectionSocketFactory socketFactory =
+        new SSLConnectionSocketFactory(sslContext, new DefaultHostnameVerifier(null));
+    final Registry<ConnectionSocketFactory> registry =
+        RegistryBuilder.<ConnectionSocketFactory> create().register("https", socketFactory)
+            .build();
+    underlyingHttpClientBuilder.setConnectionManager(new BasicHttpClientConnectionManager(registry));
+    return new THttpClient(httpsUrl, underlyingHttpClientBuilder.build());
+  }
+
   // Using endpoint identification algorithm as HTTPS enables us to do
   // CNAMEs/subjectAltName verification
   private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
diff --git a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
index 32f9b38404..bd4847c51f 100644
--- a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
+++ b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
@@ -22,8 +22,17 @@ appenders = console
 appender.console.type = Console
 appender.console.name = STDOUT
 appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{5} - %msg%n
 
 rootLogger.level = debug
 rootLogger.appenderRefs = stdout
 rootLogger.appenderRef.stdout.ref = STDOUT
+
+loggers = HttpClient, JettyHttpServer
+
+logger.HttpClient.name = org.apache.http.client
+logger.HttpClient.level = INFO
+
+logger.JettyHttpServer.name = org.eclipse.jetty.server
+logger.JettyHttpServer.level = INFO
+
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 6831e225a6..cf0b1ec8c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.metastore;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager
 import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.LogUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
 import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -51,10 +55,24 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.ServerContext;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TServlet;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportFactory;
+
+import org.eclipse.jetty.security.ConstraintMapping;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.security.Constraint;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +85,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,6 +114,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
   private static ZooKeeperHiveHelper zooKeeperHelper = null;
   private static String msHost = null;
+  private static ThriftServer thriftServer;
 
   public static boolean isRenameAllowed(Database srcDB, Database destDB) {
     if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) {
@@ -195,7 +212,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           .create('p'));
 
     }
-
     @Override
     public void parse(String[] args) {
       super.parse(args);
@@ -230,6 +246,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
   }
 
+  /*
+  Interface to encapsulate Http and binary thrift server for
+  HiveMetastore
+   */
+  private interface ThriftServer {
+    public void start() throws Throwable;
+    public boolean isRunning();
+  }
+
   /**
    * @param args
    */
@@ -346,21 +371,156 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     startMetaStore(port, bridge, conf, false, null);
   }
 
-  /**
-   * Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
-   *
-   * @param port The port on which the Thrift server will start to serve
-   * @param bridge
-   * @param conf Configuration overrides
-   * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.)
-   * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true,
-   *  when all of the background threads are scheduled. Useful for testing purposes to wait
-   *  until the MetaStore is fully initialized.
-   * @throws Throwable
-   */
-  public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
-      Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
-    isMetaStoreRemote = true;
+  // TODO: Is it worth trying to use a server that supports HTTP/2?
+  //  Does the Thrift http client support this?
+
+  private static ThriftServer startHttpMetastore(int port, Configuration conf)
+      throws Exception {
+    LOG.info("Attempting to start http metastore server on port: {}", port);
+
+    // This check is likely pointless, especially with the current state of the http
+    // servlet which respects whatever comes in. Putting this in place for the moment
+    // only to enable testing on an otherwise secure cluster.
+    LOG.info(" Checking if security is enabled");
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Logging in via keytab while starting HTTP metastore");
+      // Handle renewal
+      String kerberosName = SecurityUtil.getServerPrincipal(MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL), "0.0.0.0");
+      String keyTabFile = MetastoreConf.getVar(conf, ConfVars.KERBEROS_KEYTAB_FILE);
+      UserGroupInformation.loginUserFromKeytab(kerberosName, keyTabFile);
+    } else {
+      LOG.info("Security is not enabled. Not logging in via keytab");
+    }
+
+    long maxMessageSize = MetastoreConf.getLongVar(conf, ConfVars.SERVER_MAX_MESSAGE_SIZE);
+    int minWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MIN_THREADS);
+    int maxWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MAX_THREADS);
+    // Server thread pool
+    // Start with minWorkerThreads, expand till maxWorkerThreads and reject
+    // subsequent requests
+    final String threadPoolNamePrefix = "HiveMetastore-HttpHandler-Pool";
+    ExecutorService executorService = new ThreadPoolExecutor(
+        minWorkerThreads, maxWorkerThreads, 60, TimeUnit.SECONDS,
+        new SynchronousQueue<>(), new ThreadFactory() {
+      @Override
+      public Thread newThread(@NotNull Runnable r) {
+        Thread newThread = new Thread(r);
+        newThread.setName(threadPoolNamePrefix + ": Thread-" + newThread.getId());
+        return newThread;
+      }
+    });
+    ExecutorThreadPool threadPool = new ExecutorThreadPool((ThreadPoolExecutor) executorService);
+    // HTTP Server
+    org.eclipse.jetty.server.Server server = new Server(threadPool);
+    server.setStopAtShutdown(true);
+
+    ServerConnector connector;
+    final HttpConfiguration httpServerConf = new HttpConfiguration();
+    httpServerConf.setRequestHeaderSize(
+        MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE));
+    httpServerConf.setResponseHeaderSize(
+        MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE));
+
+    final HttpConnectionFactory http = new HttpConnectionFactory(httpServerConf);
+
+    final boolean useSsl  = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
+    String schemeName = useSsl ? "https" : "http";
+    if (useSsl) {
+      String keyStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_PATH).trim();
+      if (keyStorePath.isEmpty()) {
+        throw new IllegalArgumentException(ConfVars.SSL_KEYSTORE_PATH.toString()
+            + " Not configured for SSL connection");
+      }
+      String keyStorePassword =
+          MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD);
+      String keyStoreType =
+          MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
+      String keyStoreAlgorithm =
+          MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
+
+      SslContextFactory sslContextFactory = new SslContextFactory();
+      String[] excludedProtocols = MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",");
+      LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+      sslContextFactory.addExcludeProtocols(excludedProtocols);
+      LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = "
+          + Arrays.toString(sslContextFactory.getExcludeProtocols()));
+      sslContextFactory.setKeyStorePath(keyStorePath);
+      sslContextFactory.setKeyStorePassword(keyStorePassword);
+      sslContextFactory.setKeyStoreType(keyStoreType);
+      sslContextFactory.setKeyManagerFactoryAlgorithm(keyStoreAlgorithm);
+      connector = new ServerConnector(server, sslContextFactory, http);
+    } else {
+      connector = new ServerConnector(server, http);
+    }
+    connector.setPort(port);
+    connector.setReuseAddress(true);
+    // TODO: What should the idle timeout be for the metastore? Currently it is 30 minutes
+    long maxIdleTimeout = MetastoreConf.getTimeVar(conf, ConfVars.METASTORE_THRIFT_HTTP_MAX_IDLE_TIME,
+        TimeUnit.MILLISECONDS);
+    connector.setIdleTimeout(maxIdleTimeout);
+    // TODO: AcceptQueueSize needs to be higher for HMS
+    connector.setAcceptQueueSize(maxWorkerThreads);
+    // TODO: Connection keepalive configuration?
+
+    server.addConnector(connector);
+    TProcessor processor;
+    boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
+    final TProtocolFactory protocolFactory;
+    if (useCompactProtocol) {
+      protocolFactory = new TCompactProtocol.Factory();
+    } else {
+      protocolFactory = new TBinaryProtocol.Factory();
+    }
+
+    HMSHandler baseHandler = new HMSHandler("new db based metaserver",
+        conf);
+    IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
+    processor = new ThriftHiveMetastore.Processor<>(handler);
+    LOG.info("Starting DB backed MetaStore Server with generic processor");
+    TServlet thriftHttpServlet = new HmsThriftHttpServlet(processor, protocolFactory);
+
+    boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
+    HMSHandler.LOG.info("Direct SQL optimization = {}",  directSqlEnabled);
+
+    String httpPath =
+        MetaStoreUtils.getHttpPath(
+            MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_HTTP_PATH));
+
+    ServletContextHandler context = new ServletContextHandler(
+        ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
+
+    // Tons of stuff skipped as compared the HS2.
+    // Sesions, XSRF, Compression, path configuration, etc.
+    constraintHttpMethods(context, false);
+    server.setHandler(context);
+
+    context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+
+    return new ThriftServer() {
+      @Override
+      public void start() throws Throwable {
+        HMSHandler.LOG.debug("Starting HTTPServer for HMS");
+        server.setStopAtShutdown(true);
+        server.start();
+        HMSHandler.LOG.info("Started the new HTTPServer for metastore on port [" + port
+            + "]...");
+        HMSHandler.LOG.info("Options.minWorkerThreads = "
+            + minWorkerThreads);
+        HMSHandler.LOG.info("Options.maxWorkerThreads = "
+            + maxWorkerThreads);
+        HMSHandler.LOG.info("Enable SSL = " + useSsl);
+      }
+
+      @Override
+      public boolean isRunning() {
+        return server != null && server.isRunning();
+      }
+    };
+  }
+
+  private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridge bridge,
+      Configuration conf) throws Throwable {
     // Server will create new threads up to max as necessary. After an idle
     // period, it will destroy threads to keep the number of threads in the
     // pool to min.
@@ -396,12 +556,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       inputProtoFactory = new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize);
     }
     IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
-
     TServerSocket serverSocket;
-
     if (useSasl) {
       processor = saslServer.wrapProcessor(
-        new ThriftHiveMetastore.Processor<>(handler));
+          new ThriftHiveMetastore.Processor<>(handler));
       LOG.info("Starting DB backed MetaStore Server in Secure Mode");
     } else {
       // we are in unsecure mode.
@@ -430,9 +588,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       String keyStorePassword =
           MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD);
       String keyStoreType =
-              MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
+          MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
       String keyStoreAlgorithm =
-              MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
+          MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
       // enable SSL support for HMS
       List<String> sslVersionBlacklist = new ArrayList<>();
       for (String sslVersion : MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",")) {
@@ -488,14 +646,71 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     };
 
     tServer.setServerEventHandler(tServerEventHandler);
-    LOG.info("Started the new metaserver on port [" + port
-        + "]...");
-    LOG.info("Options.minWorkerThreads = "
-        + minWorkerThreads);
-    LOG.info("Options.maxWorkerThreads = "
-        + maxWorkerThreads);
-    LOG.info("TCP keepalive = " + tcpKeepAlive);
-    LOG.info("Enable SSL = " + useSSL);
+    return new ThriftServer() {
+      @Override
+      public void start() throws Throwable {
+        tServer.serve();
+        HMSHandler.LOG.info("Started the new metaserver on port [" + port
+            + "]...");
+        HMSHandler.LOG.info("Options.minWorkerThreads = "
+            + minWorkerThreads);
+        HMSHandler.LOG.info("Options.maxWorkerThreads = "
+            + maxWorkerThreads);
+        HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+        HMSHandler.LOG.info("Enable SSL = " + useSSL);
+      }
+
+      @Override
+      public boolean isRunning() {
+        return tServer != null && tServer.isServing();
+      }
+    };
+  }
+
+  private static void constraintHttpMethods(ServletContextHandler ctxHandler, boolean allowOptionsMethod) {
+    Constraint c = new Constraint();
+    c.setAuthenticate(true);
+
+    ConstraintMapping cmt = new ConstraintMapping();
+    cmt.setConstraint(c);
+    cmt.setMethod("TRACE");
+    cmt.setPathSpec("/*");
+
+    ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
+    if (!allowOptionsMethod) {
+      ConstraintMapping cmo = new ConstraintMapping();
+      cmo.setConstraint(c);
+      cmo.setMethod("OPTIONS");
+      cmo.setPathSpec("/*");
+      securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt, cmo});
+    } else {
+      securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt});
+    }
+    ctxHandler.setSecurityHandler(securityHandler);
+  }
+  /**
+   * Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
+   *
+   * @param port The port on which the Thrift server will start to serve
+   * @param bridge
+   * @param conf Configuration overrides
+   * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.)
+   * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true,
+   *  when all of the background threads are scheduled. Useful for testing purposes to wait
+   *  until the MetaStore is fully initialized.
+   * @throws Throwable
+   */
+  public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+      Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
+    isMetaStoreRemote = true;
+    String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary");
+    boolean isHttpTransport = transportMode.equalsIgnoreCase("http");
+    if (isHttpTransport) {
+      thriftServer = startHttpMetastore(port, conf);
+    } else {
+      thriftServer = startBinaryMetastore(port, bridge, conf);
+    }
+
     logCompactionParameters(conf);
 
     boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
@@ -507,7 +722,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       AtomicBoolean startedServing = new AtomicBoolean();
       startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing,
                 isMetastoreHousekeepingLeader(conf, getServerHostName()), startedBackgroundThreads);
-      signalOtherThreadsToStart(tServer, metaStoreThreadsLock, startCondition, startedServing);
+      signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, startCondition, startedServing);
     }
 
     // If dynamic service discovery through ZooKeeper is enabled, add this server to the ZooKeeper.
@@ -526,7 +741,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       }
     }
 
-    tServer.serve();
+    thriftServer.start();
   }
 
   private static void logCompactionParameters(Configuration conf) {
@@ -594,7 +809,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
   }
 
-  private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+  private static void signalOtherThreadsToStart(final ThriftServer thriftServer, final Lock startLock,
                                                 final Condition startCondition,
                                                 final AtomicBoolean startedServing) {
     // A simple thread to wait until the server has started and then signal the other threads to
@@ -608,7 +823,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           } catch (InterruptedException e) {
             LOG.warn("Signalling thread was interrupted: " + e.getMessage());
           }
-        } while (!server.isServing());
+        } while (!thriftServer.isRunning());
         startLock.lock();
         try {
           startedServing.set(true);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java
new file mode 100644
index 0000000000..e58bd5634b
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java
@@ -0,0 +1,113 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Enumeration;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+
+public class HmsThriftHttpServlet extends TServlet {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HmsThriftHttpServlet.class);
+
+  private static final String X_USER = MetaStoreUtils.USER_NAME_HTTP_HEADER;
+
+  private final boolean isSecurityEnabled;
+
+  public HmsThriftHttpServlet(TProcessor processor,
+      TProtocolFactory inProtocolFactory, TProtocolFactory outProtocolFactory) {
+    super(processor, inProtocolFactory, outProtocolFactory);
+    // This should ideally be reveiving an instance of the Configuration which is used for the check
+    isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+  }
+
+  public HmsThriftHttpServlet(TProcessor processor,
+      TProtocolFactory protocolFactory) {
+    super(processor, protocolFactory);
+    isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest request,
+      HttpServletResponse response) throws ServletException, IOException {
+
+    Enumeration<String> headerNames = request.getHeaderNames();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Logging headers in request");
+      while (headerNames.hasMoreElements()) {
+        String headerName = headerNames.nextElement();
+        LOG.debug("Header: [{}], Value: [{}]", headerName,
+            request.getHeader(headerName));
+      }
+    }
+    String userFromHeader = request.getHeader(X_USER);
+    if (userFromHeader == null || userFromHeader.isEmpty()) {
+      LOG.error("No user header: {} found", X_USER);
+      response.sendError(HttpServletResponse.SC_FORBIDDEN,
+          "Header: " + X_USER + " missing in the request");
+      return;
+    }
+
+    // TODO: These should ideally be in some kind of a Cache with Weak referencse.
+    // If HMS were to set up some kind of a session, this would go into the session by having
+    // this filter work with a custom Processor / or set the username into the session
+    // as is done for HS2.
+    // In case of HMS, it looks like each request is independent, and there is no session
+    // information, so the UGI needs to be set up in the Connection layer itself.
+    UserGroupInformation clientUgi;
+    // Temporary, and useless for now. Here only to allow this to work on an otherwise kerberized
+    // server.
+    if (isSecurityEnabled) {
+      LOG.info("Creating proxy user for: {}", userFromHeader);
+      clientUgi = UserGroupInformation.createProxyUser(userFromHeader, UserGroupInformation.getLoginUser());
+    } else {
+      LOG.info("Creating remote user for: {}", userFromHeader);
+      clientUgi = UserGroupInformation.createRemoteUser(userFromHeader);
+    }
+
+
+    PrivilegedExceptionAction<Void> action = new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        HmsThriftHttpServlet.super.doPost(request, response);
+        return null;
+      }
+    };
+
+    try {
+      clientUgi.doAs(action);
+    } catch (InterruptedException | RuntimeException e) {
+      LOG.error("Exception when executing http request as user: " + clientUgi.getUserName(),
+          e);
+      throw new ServletException(e);
+    }
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
similarity index 54%
copy from standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
copy to standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
index 415988dea3..6e48607d33 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
@@ -15,50 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hive.metastore;
 
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.experimental.categories.Category;
-
 
 @Category(MetastoreCheckinTest.class)
-public class TestRemoteHiveMetaStore extends TestHiveMetaStore {
-  private static boolean isServerStarted = false;
-  protected static int port;
+public class TestRemoteHiveHttpMetaStore extends TestRemoteHiveMetaStore {
 
-  public TestRemoteHiveMetaStore() {
-    super();
-    isThriftClient = true;
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-
-    if (isServerStarted) {
-      Assert.assertNotNull("Unable to connect to the MetaStore server", client);
-      return;
-    }
+  private static final Logger LOG = LoggerFactory.getLogger(TestRemoteHiveHttpMetaStore.class);
 
-    port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
-        conf);
-    System.out.println("Starting MetaStore Server on port " + port);
-    isServerStarted = true;
-
-    // This is default case with setugi off for both client and server
-    client = createClient();
+  @Override
+  public void start() throws Exception {
+    MetastoreConf.setVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "http");
+    LOG.info("Attempting to start test remote metastore in http mode");
+    super.start();
+    LOG.info("Successfully started test remote metastore in http mode");
   }
 
   @Override
   protected HiveMetaStoreClient createClient() throws Exception {
-    MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port);
-    MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);
-    return new HiveMetaStoreClient(conf);
+    MetastoreConf.setVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http");
+    return super.createClient();
   }
-}
\ No newline at end of file
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
index 415988dea3..9b07321529 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
@@ -45,7 +45,10 @@ public class TestRemoteHiveMetaStore extends TestHiveMetaStore {
       Assert.assertNotNull("Unable to connect to the MetaStore server", client);
       return;
     }
+    start();
+  }
 
+  protected void start() throws Exception {
     port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
         conf);
     System.out.println("Starting MetaStore Server on port " + port);
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index 8b242263ce..48fbe91487 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -103,6 +103,7 @@
     <spotbugs.version>4.0.3</spotbugs.version>
     <caffeine.version>2.8.4</caffeine.version>
     <slf4j.version>1.7.30</slf4j.version>
+    <httpcomponents.core.version>4.4.13</httpcomponents.core.version>
     <!-- Thrift properties -->
     <thrift.home>you-must-set-this-to-run-thrift</thrift.home>
     <thrift.gen.dir>${basedir}/src/gen/thrift</thrift.gen.dir>
@@ -361,6 +362,11 @@
         <scope>runtime</scope>
         <optional>true</optional>
       </dependency>
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpcore</artifactId>
+        <version>${httpcomponents.core.version}</version>
+      </dependency>
       <!-- test scope dependencies -->
       <dependency>
         <groupId>junit</groupId>