You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2022/04/20 13:33:45 UTC
[hive] branch master updated: HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)
This is an automated email from the ASF dual-hosted git repository.
ychena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b7da71856b HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)
b7da71856b is described below
commit b7da71856b1bb51af68a5ba6890b65f9843f3606
Author: Sourabh Goyal <so...@cloudera.com>
AuthorDate: Wed Apr 20 06:33:37 2022 -0700
HIVE-21456: Thrift over Http for Hive Metastore (#3105) (Sourabh Goyal, reviewed by Sai Gantasala, Peter Vary, Naveen and Yongzhi)
* [WIP]HIVE-21456: Thrift over Http for Hive Metastore
Change-Id: Ie610b7351fe6279353c1f781b0602da0f1860443
* Addresses review comments. Also fixes build failure
Change-Id: Idc8dc3448156e7e2715dc9ea979edf007d4d53d4
* fixes test failures
Change-Id: Ibf6210985248f88cf7011b048703e95fd99dee49
* Refactors creation of Binary and HTTP clients in seprate methods in HiveMetastoreClient
Change-Id: Ib080e24eede76104e10458343f85ac746022f16d
* Addresses review comments
Change-Id: I5ec4fb201bd65bc358c38160348b200fc16d730c
* Fixes validation of maxIdleTimeout in metastore http server
Change-Id: I52990b3904cd8d42da9cff9f282e1f099323e3d7
* Disabled HTTP TRACE in embedded jetty server in HMS
Change-Id: Idcdec4ee0ff7d3ded67816cca4505627a1e5b33b
* Addresses review comments
Change-Id: Ie046c512f2095b0d71743a9485620e369dc75b17
* Addresses nits. Adds some more comments
Change-Id: I39b50cd549af62e5d460fa99167c5aab221edaf8
---
.../java/org/hadoop/hive/jdbc/SSLTestUtils.java | 6 +
.../test/java/org/apache/hive/jdbc/TestSSL.java | 24 +-
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 352 +++++++++++++--------
.../hadoop/hive/metastore/conf/MetastoreConf.java | 27 ++
.../hive/metastore/utils/MetaStoreUtils.java | 25 ++
.../hadoop/hive/metastore/utils/SecurityUtils.java | 46 +++
.../src/test/resources/log4j2.properties | 11 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 285 +++++++++++++++--
.../hive/metastore/HmsThriftHttpServlet.java | 113 +++++++
...Store.java => TestRemoteHiveHttpMetaStore.java} | 49 +--
.../hive/metastore/TestRemoteHiveMetaStore.java | 3 +
standalone-metastore/pom.xml | 6 +
12 files changed, 752 insertions(+), 195 deletions(-)
diff --git a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
index b8e7e3de65..3917a3b457 100644
--- a/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
+++ b/itests/hive-unit/src/main/java/org/hadoop/hive/jdbc/SSLTestUtils.java
@@ -67,6 +67,12 @@ public class SSLTestUtils {
KEY_STORE_TRUST_STORE_PASSWORD);
}
+ public static void setMetastoreHttpsConf(HiveConf conf) {
+ setMetastoreSslConf(conf);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.THRIFT_TRANSPORT_MODE, "http");
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http");
+ }
+
public static void clearSslConfOverlay(Map<String, String> confOverlay) {
confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname, "false");
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
index 1d170ec309..ec6c65f75a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestSSL.java
@@ -53,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@org.junit.Ignore("HIVE-22620")
public class TestSSL {
private static final Logger LOG = LoggerFactory.getLogger(TestSSL.class);
@@ -65,6 +64,7 @@ public class TestSSL {
private static final String JAVA_TRUST_STORE_PROP = "javax.net.ssl.trustStore";
private static final String JAVA_TRUST_STORE_PASS_PROP = "javax.net.ssl.trustStorePassword";
private static final String JAVA_TRUST_STORE_TYPE_PROP = "javax.net.ssl.trustStoreType";
+ private static final String KEY_MANAGER_FACTORY_ALGORITHM = "SunX509";
private MiniHS2 miniHS2 = null;
private static HiveConf conf = new HiveConf();
@@ -290,6 +290,7 @@ public class TestSSL {
* Test SSL client connection to SSL server
* @throws Exception
*/
+ @Ignore
@Test
public void testSSLConnectionWithProperty() throws Exception {
SSLTestUtils.setSslConfOverlay(confOverlay);
@@ -390,6 +391,7 @@ public class TestSSL {
* Opening a new connection with this wrong certificate should fail
* @throws Exception
*/
+ @Ignore
@Test
public void testConnectionWrongCertCN() throws Exception {
// This call sets the default ssl params including the correct keystore in the server config
@@ -437,15 +439,34 @@ public class TestSSL {
* Test HMS server with SSL
* @throws Exception
*/
+ @Ignore
@Test
public void testMetastoreWithSSL() throws Exception {
testSSLHMS(true);
}
+ /**
+ * Test HMS server with Thrift over Http + SSL
+ * @throws Exception
+ */
+ @Test
+ public void testMetastoreWithHttps() throws Exception {
+ SSLTestUtils.setMetastoreHttpsConf(conf);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM,
+ KEY_MANAGER_FACTORY_ALGORITHM);
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_TYPE, KEY_STORE_TRUST_STORE_TYPE);
+ // false flag in testSSLHMS will set key store type for metastore
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM,
+ KEY_MANAGER_FACTORY_ALGORITHM);
+
+ testSSLHMS(false);
+ }
+
/**
* Test HMS server with SSL with input keystore type
* @throws Exception
*/
+ @Ignore
@Test
public void testMetastoreWithSSLKeyStoreType() throws Exception {
testSSLHMS(false);
@@ -511,6 +532,7 @@ public class TestSSL {
* Test SSL client connection to SSL server
* @throws Exception
*/
+ @Ignore
@Test
public void testSSLConnectionWithKeystoreType() throws Exception {
SSLTestUtils.setSslConfOverlay(confOverlay);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 81f6083ce4..f1d06d1d34 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -78,12 +78,18 @@ import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.protocol.HttpContext;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
@@ -594,6 +600,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
client.rename_partition_req(req);
}
+ private THttpClient createHttpClient(URI store, boolean useSSL) throws MetaException,
+ TTransportException {
+ String path = MetaStoreUtils.getHttpPath(MetastoreConf.getVar(conf, ConfVars.THRIFT_HTTP_PATH));
+ String httpUrl = (useSSL ? "https://" : "http://") + store.getHost() + ":" + store.getPort() + path;
+
+ String user = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
+ if (user == null || user.equals("")) {
+ try {
+ LOG.debug("No username passed in config " + ConfVars.METASTORE_CLIENT_PLAIN_USERNAME.getHiveName() +
+ ". Trying to get the current user from UGI" );
+ user = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ throw new MetaException("Failed to get client username from UGI");
+ }
+ }
+ final String httpUser = user;
+ THttpClient tHttpClient;
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.addInterceptorFirst(new HttpRequestInterceptor() {
+ @Override
+ public void process(HttpRequest httpRequest, HttpContext httpContext)
+ throws HttpException, IOException {
+ httpRequest.addHeader(MetaStoreUtils.USER_NAME_HTTP_HEADER, httpUser);
+ }
+ });
+
+ try {
+ if (useSSL) {
+ String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
+ if (trustStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
+ + " Not configured for SSL connection");
+ }
+ String trustStorePassword =
+ MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
+ String trustStoreType =
+ MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
+ String trustStoreAlgorithm =
+ MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
+ tHttpClient = SecurityUtils.getThriftHttpsClient(httpUrl, trustStorePath, trustStorePassword,
+ trustStoreAlgorithm, trustStoreType, httpClientBuilder);
+ } else {
+ tHttpClient = new THttpClient(httpUrl, httpClientBuilder.build());
+ }
+ } catch (Exception e) {
+ if (e instanceof TTransportException) {
+ throw (TTransportException)e;
+ } else {
+ throw new MetaException("Failed to create http transport client to url: " + httpUrl
+ + ". Error:" + e);
+ }
+ }
+ LOG.debug("Created thrift http client for URL: " + httpUrl);
+ return tHttpClient;
+ }
+
+ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTransportException,
+ MetaException {
+ TTransport binaryTransport = null;
+ try {
+ int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
+ ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (useSSL) {
+ String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
+ if (trustStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
+ + " Not configured for SSL connection");
+ }
+ String trustStorePassword =
+ MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
+ String trustStoreType =
+ MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
+ String trustStoreAlgorithm =
+ MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
+ binaryTransport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
+ trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
+ } else {
+ binaryTransport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(),
+ clientSocketTimeout);
+ }
+ binaryTransport = createAuthBinaryTransport(store, binaryTransport);
+ } catch (Exception e) {
+ if (e instanceof TTransportException) {
+ throw (TTransportException)e;
+ } else {
+ throw new MetaException("Failed to create binary transport client to url: " + store
+ + ". Error: " + e);
+ }
+ }
+ LOG.debug("Created thrift binary client for URI: " + store);
+ return binaryTransport;
+ }
+
private void open() throws MetaException {
isConnected = false;
TTransportException tte = null;
@@ -602,10 +701,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL);
String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE);
boolean usePasswordAuth = false;
- boolean useFramedTransport = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT);
boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
- int clientSocketTimeout = (int) MetastoreConf.getTimeVar(conf,
- ConfVars.CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
+ String transportMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE);
+ boolean isHttpTransportMode = transportMode.equalsIgnoreCase("http");
if (clientAuthMode != null) {
usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
@@ -613,118 +711,18 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
for (URI store : metastoreUris) {
- LOG.info("Trying to connect to metastore with URI ({})", store);
-
+ LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store,
+ transportMode);
try {
- if (useSSL) {
- try {
- String trustStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_PATH).trim();
- if (trustStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.SSL_TRUSTSTORE_PATH
- + " Not configured for SSL connection");
- }
- String trustStorePassword =
- MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_TRUSTSTORE_PASSWORD);
- String trustStoreType =
- MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTSTORE_TYPE).trim();
- String trustStoreAlgorithm =
- MetastoreConf.getVar(conf, ConfVars.SSL_TRUSTMANAGERFACTORY_ALGORITHM).trim();
-
- // Create an SSL socket and connect
- transport = SecurityUtils.getSSLSocket(store.getHost(), store.getPort(), clientSocketTimeout,
- trustStorePath, trustStorePassword, trustStoreType, trustStoreAlgorithm);
- final int newCount = connCount.incrementAndGet();
- LOG.debug(
- "Opened an SSL connection to metastore, current connections: {}",
- newCount);
- if (LOG.isTraceEnabled()) {
- LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
- System.identityHashCode(this), new Exception());
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- } catch (TTransportException e) {
- tte = e;
- throw new MetaException(e.toString());
- }
- } else {
- try {
- transport = new TSocket(new TConfiguration(),store.getHost(), store.getPort(), clientSocketTimeout);
- } catch (TTransportException e) {
- tte = e;
- throw new MetaException(e.toString());
- }
- }
-
- if (usePasswordAuth) {
- // we are using PLAIN Sasl connection with user/password
- LOG.debug("HMSC::open(): Creating plain authentication thrift connection.");
- String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
-
- if (null == userName || userName.isEmpty()) {
- throw new MetaException("No user specified for plain transport.");
- }
-
- // The password is not directly provided. It should be obtained from a keystore pointed
- // by configuration "hadoop.security.credential.provider.path".
- try {
- String passwd = null;
- char[] pwdCharArray = conf.getPassword(userName);
- if (null != pwdCharArray) {
- passwd = new String(pwdCharArray);
- }
- if (null == passwd) {
- throw new MetaException("No password found for user " + userName);
- }
- // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
- transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, transport);
- } catch (IOException | TTransportException sasle) {
- // IOException covers SaslException
- LOG.error("Could not create client transport", sasle);
- throw new MetaException(sasle.toString());
- }
- } else if (useSasl) {
- // Wrap thrift connection with SASL for secure connection.
- try {
- HadoopThriftAuthBridge.Client authBridge =
- HadoopThriftAuthBridge.getBridge().createClient();
-
- // check if we should use delegation tokens to authenticate
- // the call below gets hold of the tokens if they are set up by hadoop
- // this should happen on the map/reduce tasks if the client added the
- // tokens into hadoop's credential store in the front end during job
- // submission.
- String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
- // tokenSig could be null
- tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
-
- if (tokenStrForm != null) {
- LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection.");
- // authenticate using delegation tokens via the "DIGEST" mechanism
- transport = authBridge.createClientTransport(null, store.getHost(),
- "DIGEST", tokenStrForm, transport,
- MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
- } else {
- LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.");
- String principalConfig =
- MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL);
- transport = authBridge.createClientTransport(
- principalConfig, store.getHost(), "KERBEROS", null,
- transport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
- }
- } catch (IOException ioe) {
- LOG.error("Failed to create client transport", ioe);
- throw new MetaException(ioe.toString());
- }
- } else {
- if (useFramedTransport) {
- try {
- transport = new TFramedTransport(transport);
- } catch (TTransportException e) {
- LOG.error("Failed to create client transport", e);
- throw new MetaException(e.toString());
- }
+ try {
+ if (isHttpTransportMode) {
+ transport = createHttpClient(store, useSSL);
+ } else {
+ transport = createBinaryClient(store, useSSL);
}
+ } catch (TTransportException te) {
+ tte = te;
+ throw new MetaException(te.toString());
}
final TProtocol protocol;
@@ -738,23 +736,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
if (!transport.isOpen()) {
transport.open();
final int newCount = connCount.incrementAndGet();
- LOG.info("Opened a connection to metastore, URI ({}) "
- + "current connections: {}", store, newCount);
- if (LOG.isTraceEnabled()) {
- LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
- System.identityHashCode(this), new Exception());
+ if (useSSL) {
+ LOG.info(
+ "Opened an SSL connection to metastore, current connections: {}",
+ newCount);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
+ System.identityHashCode(this), new Exception());
+ }
+ } else {
+ LOG.info("Opened a connection to metastore, URI ({}) "
+ + "current connections: {}", store, newCount);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
+ System.identityHashCode(this), new Exception());
+ }
}
}
isConnected = true;
} catch (TTransportException e) {
tte = e;
- LOG.warn("Failed to connect to the MetaStore Server URI ({})",
- store);
- LOG.debug("Failed to connect to the MetaStore Server URI ({})",
- store, e);
+ String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s "
+ + "transport mode", store, transportMode);
+ LOG.warn(errMsg);
+ LOG.debug(errMsg, e);
}
- if (isConnected && !useSasl && !usePasswordAuth &&
+ if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode &&
MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
// Call set_ugi, only in unsecure mode.
try {
@@ -773,8 +781,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
} catch (MetaException e) {
recentME = e;
- LOG.error("Failed to connect to metastore with URI (" + store
- + ") in attempt " + attempt, e);
+ String errMsg = "Failed to connect to metastore with URI (" + store
+ + ") transport mode:" + transportMode + " in attempt " + attempt;
+ LOG.error(errMsg, e);
}
if (isConnected) {
break;
@@ -806,6 +815,99 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
snapshotActiveConf();
}
+ // wraps the underlyingTransport in the appropriate transport based on mode of authentication
+ private TTransport createAuthBinaryTransport(URI store, TTransport underlyingTransport)
+ throws MetaException {
+ boolean isHttpTransportMode =
+ MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE).
+ equalsIgnoreCase("http");
+ Preconditions.checkArgument(!isHttpTransportMode);
+ Preconditions.checkNotNull(underlyingTransport, "Underlying transport should not be null");
+ // default transport is the underlying one
+ TTransport transport = underlyingTransport;
+ boolean useFramedTransport =
+ MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_FRAMED_TRANSPORT);
+ boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
+ boolean useSasl = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL);
+ String clientAuthMode = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_AUTH_MODE);
+ boolean usePasswordAuth = false;
+
+ if (clientAuthMode != null) {
+ usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
+ }
+ if (usePasswordAuth) {
+ // we are using PLAIN Sasl connection with user/password
+ LOG.debug("HMSC::open(): Creating plain authentication thrift connection.");
+ String userName = MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_PLAIN_USERNAME);
+
+ if (null == userName || userName.isEmpty()) {
+ throw new MetaException("No user specified for plain transport.");
+ }
+
+ // The password is not directly provided. It should be obtained from a keystore pointed
+ // by configuration "hadoop.security.credential.provider.path".
+ try {
+ String passwd = null;
+ char[] pwdCharArray = conf.getPassword(userName);
+ if (null != pwdCharArray) {
+ passwd = new String(pwdCharArray);
+ }
+ if (null == passwd) {
+ throw new MetaException("No password found for user " + userName);
+ }
+ // Overlay the SASL transport on top of the base socket transport (SSL or non-SSL)
+ transport = MetaStorePlainSaslHelper.getPlainTransport(userName, passwd, underlyingTransport);
+ } catch (IOException | TTransportException sasle) {
+ // IOException covers SaslException
+ LOG.error("Could not create client transport", sasle);
+ throw new MetaException(sasle.toString());
+ }
+ } else if (useSasl) {
+ // Wrap thrift connection with SASL for secure connection.
+ try {
+ HadoopThriftAuthBridge.Client authBridge =
+ HadoopThriftAuthBridge.getBridge().createClient();
+
+ // check if we should use delegation tokens to authenticate
+ // the call below gets hold of the tokens if they are set up by hadoop
+ // this should happen on the map/reduce tasks if the client added the
+ // tokens into hadoop's credential store in the front end during job
+ // submission.
+ String tokenSig = MetastoreConf.getVar(conf, ConfVars.TOKEN_SIGNATURE);
+ // tokenSig could be null
+ tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
+
+ if (tokenStrForm != null) {
+ LOG.debug("HMSC::open(): Found delegation token. Creating DIGEST-based thrift connection.");
+ // authenticate using delegation tokens via the "DIGEST" mechanism
+ transport = authBridge.createClientTransport(null, store.getHost(),
+ "DIGEST", tokenStrForm, underlyingTransport,
+ MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+ } else {
+ LOG.debug("HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.");
+ String principalConfig =
+ MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL);
+ transport = authBridge.createClientTransport(
+ principalConfig, store.getHost(), "KERBEROS", null,
+ underlyingTransport, MetaStoreUtils.getMetaStoreSaslProperties(conf, useSSL));
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to create client transport", ioe);
+ throw new MetaException(ioe.toString());
+ }
+ } else {
+ if (useFramedTransport) {
+ try {
+ transport = new TFramedTransport(transport);
+ } catch (TTransportException e) {
+ LOG.error("Failed to create client transport", e);
+ throw new MetaException(e.toString());
+ }
+ }
+ }
+ return transport;
+ }
+
private void snapshotActiveConf() {
currentMetaVars = new HashMap<>(MetastoreConf.metaVars.length);
for (ConfVars oneVar : MetastoreConf.metaVars) {
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9cc8d04ceb..51a46e8d42 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1364,6 +1364,13 @@ public class MetastoreConf {
"Comma-separated list of tasks that will be started in separate threads. These will be" +
" started only when the metastore is running as a separate service. They must " +
"implement " + METASTORE_TASK_THREAD_CLASS),
+ THRIFT_TRANSPORT_MODE("metastore.server.thrift.transport.mode",
+ "hive.metastore.server.thrift.transport.mode", "binary",
+ "Transport mode for thrift server in Metastore. Can be binary or http"),
+ THRIFT_HTTP_PATH("metastore.server.thrift.http.path",
+ "hive.metastore.server.thrift.http.path",
+ "metastore",
+ "Path component of URL endpoint when in HTTP mode"),
TCP_KEEP_ALIVE("metastore.server.tcp.keepalive",
"hive.metastore.server.tcp.keepalive", true,
"Whether to enable TCP keepalive for the metastore server. Keepalive will prevent accumulation of half-open connections."),
@@ -1504,6 +1511,26 @@ public class MetastoreConf {
USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false,
"Comma separated list of users who are in admin role for bootstrapping.\n" +
"More users can be added in ADMIN role later."),
+ // TODO: Should we have a separate config for the metastoreclient or THRIFT_TRANSPORT_MODE
+ // would suffice ?
+ METASTORE_CLIENT_THRIFT_TRANSPORT_MODE("metastore.client.thrift.transport.mode",
+ "hive.metastore.client.thrift.transport.mode", "binary",
+ "Transport mode to be used by the metastore client. It should be the same as " + THRIFT_TRANSPORT_MODE),
+ METASTORE_CLIENT_THRIFT_HTTP_PATH("metastore.client.thrift.http.path",
+ "hive.metastore.client.thrift.http.path",
+ "metastore",
+ "Path component of URL endpoint when in HTTP mode"),
+ METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE("metastore.server.thrift.http.request.header.size",
+ "hive.metastore.server.thrift.http.request.header.size", 6*1024,
+ "Request header size in bytes when using HTTP transport mode for metastore thrift server."
+ + " Defaults to jetty's defaults"),
+ METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE("metastore.server.thrift.http.response.header.size",
+ "metastore.server.thrift.http.response.header.size", 6*1024,
+ "Response header size in bytes when using HTTP transport mode for metastore thrift server."
+ + " Defaults to jetty's defaults"),
+ METASTORE_THRIFT_HTTP_MAX_IDLE_TIME("metastore.thrift.http.max.idle.time", "hive.metastore.thrift.http.max.idle.time",
+ 1800, TimeUnit.SECONDS,
+ "Maximum idle time for a connection on the server when in HTTP mode."),
USE_SSL("metastore.use.SSL", "hive.metastore.use.SSL", false,
"Set this to true for using SSL encryption in HMS server."),
// We should somehow unify next two options.
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
index 8854430f7a..d4bcb5b5e9 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java
@@ -128,6 +128,8 @@ public class MetaStoreUtils {
public static final String NO_VAL = " --- ";
+ public static final String USER_NAME_HTTP_HEADER = "x-actor-username";
+
/**
* Catches exceptions that cannot be handled and wraps them in MetaException.
*
@@ -1157,4 +1159,27 @@ public class MetaStoreUtils {
}
return result;
}
+
+ /**
+ * The config parameter can be like "path", "/path", "/path/", "path/*", "/path1/path2/*" and so on.
+ * httpPath should end up as "/*", "/path/*" or "/path1/../pathN/*"
+ * @param httpPath
+ * @return
+ */
+ public static String getHttpPath(String httpPath) {
+ if (httpPath == null || httpPath.equals("")) {
+ httpPath = "/*";
+ } else {
+ if (!httpPath.startsWith("/")) {
+ httpPath = "/" + httpPath;
+ }
+ if (httpPath.endsWith("/")) {
+ httpPath = httpPath + "*";
+ }
+ if (!httpPath.endsWith("/*")) {
+ httpPath = httpPath + "/*";
+ }
+ }
+ return httpPath;
+ }
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
index 2b326b2922..cb5b170808 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hive.metastore.utils;
+import com.google.common.base.Preconditions;
+import java.io.FileInputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import javax.net.ssl.SSLContext;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector;
@@ -27,12 +34,21 @@ import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,6 +287,36 @@ public class SecurityUtils {
return getSSLSocketWithHttps(tSSLSocket);
}
+ /*
+ Sets the ssl related configs in the underlying http client builder and wrap it up
+ in a THttpClient
+ */
+ public static THttpClient getThriftHttpsClient(String httpsUrl, String trustStorePath,
+ String trustStorePasswd, String trustStoreAlgorithm, String trustStoreType,
+ HttpClientBuilder underlyingHttpClientBuilder) throws TTransportException, IOException,
+ KeyStoreException, NoSuchAlgorithmException, CertificateException,
+ KeyManagementException {
+ Preconditions.checkNotNull(underlyingHttpClientBuilder, "httpClientBuilder should not be null");
+ if (trustStoreType == null || trustStoreType.isEmpty()) {
+ trustStoreType = KeyStore.getDefaultType();
+ }
+ KeyStore sslTrustStore = KeyStore.getInstance(trustStoreType);
+ try (FileInputStream fis = new FileInputStream(trustStorePath)) {
+ sslTrustStore.load(fis, trustStorePasswd.toCharArray());
+ }
+
+ SSLContext sslContext =
+ SSLContexts.custom().setTrustManagerFactoryAlgorithm(trustStoreAlgorithm).
+ loadTrustMaterial(sslTrustStore, null).build();
+ SSLConnectionSocketFactory socketFactory =
+ new SSLConnectionSocketFactory(sslContext, new DefaultHostnameVerifier(null));
+ final Registry<ConnectionSocketFactory> registry =
+ RegistryBuilder.<ConnectionSocketFactory> create().register("https", socketFactory)
+ .build();
+ underlyingHttpClientBuilder.setConnectionManager(new BasicHttpClientConnectionManager(registry));
+ return new THttpClient(httpsUrl, underlyingHttpClientBuilder.build());
+ }
+
// Using endpoint identification algorithm as HTTPS enables us to do
// CNAMEs/subjectAltName verification
private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
diff --git a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
index 32f9b38404..bd4847c51f 100644
--- a/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
+++ b/standalone-metastore/metastore-common/src/test/resources/log4j2.properties
@@ -22,8 +22,17 @@ appenders = console
appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{5} - %msg%n
rootLogger.level = debug
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT
+
+loggers = HttpClient, JettyHttpServer
+
+logger.HttpClient.name = org.apache.http.client
+logger.HttpClient.level = INFO
+
+logger.JettyHttpServer.name = org.eclipse.jetty.server
+logger.JettyHttpServer.level = INFO
+
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 6831e225a6..cf0b1ec8c1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.metastore;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
import org.apache.commons.cli.OptionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager
import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.LogUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -51,10 +55,24 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TServlet;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
+
+import org.eclipse.jetty.security.ConstraintMapping;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.security.Constraint;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +85,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -98,6 +114,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
private static ZooKeeperHiveHelper zooKeeperHelper = null;
private static String msHost = null;
+ private static ThriftServer thriftServer;
public static boolean isRenameAllowed(Database srcDB, Database destDB) {
if (!srcDB.getName().equalsIgnoreCase(destDB.getName())) {
@@ -195,7 +212,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
.create('p'));
}
-
@Override
public void parse(String[] args) {
super.parse(args);
@@ -230,6 +246,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ /*
+ Interface to encapsulate Http and binary thrift server for
+ HiveMetastore
+ */
+ private interface ThriftServer {
+ public void start() throws Throwable;
+ public boolean isRunning();
+ }
+
/**
* @param args
*/
@@ -346,21 +371,156 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startMetaStore(port, bridge, conf, false, null);
}
- /**
- * Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
- *
- * @param port The port on which the Thrift server will start to serve
- * @param bridge
- * @param conf Configuration overrides
- * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.)
- * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true,
- * when all of the background threads are scheduled. Useful for testing purposes to wait
- * until the MetaStore is fully initialized.
- * @throws Throwable
- */
- public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
- Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
- isMetaStoreRemote = true;
+ // TODO: Is it worth trying to use a server that supports HTTP/2?
+ // Does the Thrift http client support this?
+
+ private static ThriftServer startHttpMetastore(int port, Configuration conf)
+ throws Exception {
+ LOG.info("Attempting to start http metastore server on port: {}", port);
+
+ // This check is likely pointless, especially with the current state of the http
+ // servlet which respects whatever comes in. Putting this in place for the moment
+ // only to enable testing on an otherwise secure cluster.
+ LOG.info(" Checking if security is enabled");
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Logging in via keytab while starting HTTP metastore");
+ // Handle renewal
+ String kerberosName = SecurityUtil.getServerPrincipal(MetastoreConf.getVar(conf, ConfVars.KERBEROS_PRINCIPAL), "0.0.0.0");
+ String keyTabFile = MetastoreConf.getVar(conf, ConfVars.KERBEROS_KEYTAB_FILE);
+ UserGroupInformation.loginUserFromKeytab(kerberosName, keyTabFile);
+ } else {
+ LOG.info("Security is not enabled. Not logging in via keytab");
+ }
+
+ long maxMessageSize = MetastoreConf.getLongVar(conf, ConfVars.SERVER_MAX_MESSAGE_SIZE);
+ int minWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MIN_THREADS);
+ int maxWorkerThreads = MetastoreConf.getIntVar(conf, ConfVars.SERVER_MAX_THREADS);
+ // Server thread pool
+ // Start with minWorkerThreads, expand till maxWorkerThreads and reject
+ // subsequent requests
+ final String threadPoolNamePrefix = "HiveMetastore-HttpHandler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(
+ minWorkerThreads, maxWorkerThreads, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new ThreadFactory() {
+ @Override
+ public Thread newThread(@NotNull Runnable r) {
+ Thread newThread = new Thread(r);
+ newThread.setName(threadPoolNamePrefix + ": Thread-" + newThread.getId());
+ return newThread;
+ }
+ });
+ ExecutorThreadPool threadPool = new ExecutorThreadPool((ThreadPoolExecutor) executorService);
+ // HTTP Server
+ org.eclipse.jetty.server.Server server = new Server(threadPool);
+ server.setStopAtShutdown(true);
+
+ ServerConnector connector;
+ final HttpConfiguration httpServerConf = new HttpConfiguration();
+ httpServerConf.setRequestHeaderSize(
+ MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_REQUEST_HEADER_SIZE));
+ httpServerConf.setResponseHeaderSize(
+ MetastoreConf.getIntVar(conf, ConfVars.METASTORE_THRIFT_HTTP_RESPONSE_HEADER_SIZE));
+
+ final HttpConnectionFactory http = new HttpConnectionFactory(httpServerConf);
+
+ final boolean useSsl = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
+ String schemeName = useSsl ? "https" : "http";
+ if (useSsl) {
+ String keyStorePath = MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_PATH).trim();
+ if (keyStorePath.isEmpty()) {
+ throw new IllegalArgumentException(ConfVars.SSL_KEYSTORE_PATH.toString()
+ + " Not configured for SSL connection");
+ }
+ String keyStorePassword =
+ MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD);
+ String keyStoreType =
+ MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
+ String keyStoreAlgorithm =
+ MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
+
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ String[] excludedProtocols = MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",");
+ LOG.info("HTTP Server SSL: adding excluded protocols: " + Arrays.toString(excludedProtocols));
+ sslContextFactory.addExcludeProtocols(excludedProtocols);
+ LOG.info("HTTP Server SSL: SslContextFactory.getExcludeProtocols = "
+ + Arrays.toString(sslContextFactory.getExcludeProtocols()));
+ sslContextFactory.setKeyStorePath(keyStorePath);
+ sslContextFactory.setKeyStorePassword(keyStorePassword);
+ sslContextFactory.setKeyStoreType(keyStoreType);
+ sslContextFactory.setKeyManagerFactoryAlgorithm(keyStoreAlgorithm);
+ connector = new ServerConnector(server, sslContextFactory, http);
+ } else {
+ connector = new ServerConnector(server, http);
+ }
+ connector.setPort(port);
+ connector.setReuseAddress(true);
+ // TODO: What should the idle timeout be for the metastore? Currently it is 30 minutes
+ long maxIdleTimeout = MetastoreConf.getTimeVar(conf, ConfVars.METASTORE_THRIFT_HTTP_MAX_IDLE_TIME,
+ TimeUnit.MILLISECONDS);
+ connector.setIdleTimeout(maxIdleTimeout);
+ // TODO: AcceptQueueSize needs to be higher for HMS
+ connector.setAcceptQueueSize(maxWorkerThreads);
+ // TODO: Connection keepalive configuration?
+
+ server.addConnector(connector);
+ TProcessor processor;
+ boolean useCompactProtocol = MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_COMPACT_PROTOCOL);
+ final TProtocolFactory protocolFactory;
+ if (useCompactProtocol) {
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ HMSHandler baseHandler = new HMSHandler("new db based metaserver",
+ conf);
+ IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
+ processor = new ThriftHiveMetastore.Processor<>(handler);
+ LOG.info("Starting DB backed MetaStore Server with generic processor");
+ TServlet thriftHttpServlet = new HmsThriftHttpServlet(processor, protocolFactory);
+
+ boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
+ HMSHandler.LOG.info("Direct SQL optimization = {}", directSqlEnabled);
+
+ String httpPath =
+ MetaStoreUtils.getHttpPath(
+ MetastoreConf.getVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_HTTP_PATH));
+
+ ServletContextHandler context = new ServletContextHandler(
+ ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS);
+
+ // Tons of stuff skipped as compared the HS2.
+ // Sesions, XSRF, Compression, path configuration, etc.
+ constraintHttpMethods(context, false);
+ server.setHandler(context);
+
+ context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
+
+
+ return new ThriftServer() {
+ @Override
+ public void start() throws Throwable {
+ HMSHandler.LOG.debug("Starting HTTPServer for HMS");
+ server.setStopAtShutdown(true);
+ server.start();
+ HMSHandler.LOG.info("Started the new HTTPServer for metastore on port [" + port
+ + "]...");
+ HMSHandler.LOG.info("Options.minWorkerThreads = "
+ + minWorkerThreads);
+ HMSHandler.LOG.info("Options.maxWorkerThreads = "
+ + maxWorkerThreads);
+ HMSHandler.LOG.info("Enable SSL = " + useSsl);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return server != null && server.isRunning();
+ }
+ };
+ }
+
+ private static ThriftServer startBinaryMetastore(int port, HadoopThriftAuthBridge bridge,
+ Configuration conf) throws Throwable {
// Server will create new threads up to max as necessary. After an idle
// period, it will destroy threads to keep the number of threads in the
// pool to min.
@@ -396,12 +556,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
inputProtoFactory = new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize);
}
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
-
TServerSocket serverSocket;
-
if (useSasl) {
processor = saslServer.wrapProcessor(
- new ThriftHiveMetastore.Processor<>(handler));
+ new ThriftHiveMetastore.Processor<>(handler));
LOG.info("Starting DB backed MetaStore Server in Secure Mode");
} else {
// we are in unsecure mode.
@@ -430,9 +588,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
String keyStorePassword =
MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.SSL_KEYSTORE_PASSWORD);
String keyStoreType =
- MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
+ MetastoreConf.getVar(conf, ConfVars.SSL_KEYSTORE_TYPE).trim();
String keyStoreAlgorithm =
- MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
+ MetastoreConf.getVar(conf, ConfVars.SSL_KEYMANAGERFACTORY_ALGORITHM).trim();
// enable SSL support for HMS
List<String> sslVersionBlacklist = new ArrayList<>();
for (String sslVersion : MetastoreConf.getVar(conf, ConfVars.SSL_PROTOCOL_BLACKLIST).split(",")) {
@@ -488,14 +646,71 @@ public class HiveMetaStore extends ThriftHiveMetastore {
};
tServer.setServerEventHandler(tServerEventHandler);
- LOG.info("Started the new metaserver on port [" + port
- + "]...");
- LOG.info("Options.minWorkerThreads = "
- + minWorkerThreads);
- LOG.info("Options.maxWorkerThreads = "
- + maxWorkerThreads);
- LOG.info("TCP keepalive = " + tcpKeepAlive);
- LOG.info("Enable SSL = " + useSSL);
+ return new ThriftServer() {
+ @Override
+ public void start() throws Throwable {
+ tServer.serve();
+ HMSHandler.LOG.info("Started the new metaserver on port [" + port
+ + "]...");
+ HMSHandler.LOG.info("Options.minWorkerThreads = "
+ + minWorkerThreads);
+ HMSHandler.LOG.info("Options.maxWorkerThreads = "
+ + maxWorkerThreads);
+ HMSHandler.LOG.info("TCP keepalive = " + tcpKeepAlive);
+ HMSHandler.LOG.info("Enable SSL = " + useSSL);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return tServer != null && tServer.isServing();
+ }
+ };
+ }
+
+ private static void constraintHttpMethods(ServletContextHandler ctxHandler, boolean allowOptionsMethod) {
+ Constraint c = new Constraint();
+ c.setAuthenticate(true);
+
+ ConstraintMapping cmt = new ConstraintMapping();
+ cmt.setConstraint(c);
+ cmt.setMethod("TRACE");
+ cmt.setPathSpec("/*");
+
+ ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
+ if (!allowOptionsMethod) {
+ ConstraintMapping cmo = new ConstraintMapping();
+ cmo.setConstraint(c);
+ cmo.setMethod("OPTIONS");
+ cmo.setPathSpec("/*");
+ securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt, cmo});
+ } else {
+ securityHandler.setConstraintMappings(new ConstraintMapping[] {cmt});
+ }
+ ctxHandler.setSecurityHandler(securityHandler);
+ }
+ /**
+ * Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
+ *
+ * @param port The port on which the Thrift server will start to serve
+ * @param bridge
+ * @param conf Configuration overrides
+ * @param startMetaStoreThreads Start the background threads (initiator, cleaner, statsupdater, etc.)
+ * @param startedBackgroundThreads If startMetaStoreThreads is true, this AtomicBoolean will be switched to true,
+ * when all of the background threads are scheduled. Useful for testing purposes to wait
+ * until the MetaStore is fully initialized.
+ * @throws Throwable
+ */
+ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
+ Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
+ isMetaStoreRemote = true;
+ String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary");
+ boolean isHttpTransport = transportMode.equalsIgnoreCase("http");
+ if (isHttpTransport) {
+ thriftServer = startHttpMetastore(port, conf);
+ } else {
+ thriftServer = startBinaryMetastore(port, bridge, conf);
+ }
+
logCompactionParameters(conf);
boolean directSqlEnabled = MetastoreConf.getBoolVar(conf, ConfVars.TRY_DIRECT_SQL);
@@ -507,7 +722,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
AtomicBoolean startedServing = new AtomicBoolean();
startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, startedServing,
isMetastoreHousekeepingLeader(conf, getServerHostName()), startedBackgroundThreads);
- signalOtherThreadsToStart(tServer, metaStoreThreadsLock, startCondition, startedServing);
+ signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, startCondition, startedServing);
}
// If dynamic service discovery through ZooKeeper is enabled, add this server to the ZooKeeper.
@@ -526,7 +741,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- tServer.serve();
+ thriftServer.start();
}
private static void logCompactionParameters(Configuration conf) {
@@ -594,7 +809,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- private static void signalOtherThreadsToStart(final TServer server, final Lock startLock,
+ private static void signalOtherThreadsToStart(final ThriftServer thriftServer, final Lock startLock,
final Condition startCondition,
final AtomicBoolean startedServing) {
// A simple thread to wait until the server has started and then signal the other threads to
@@ -608,7 +823,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
} catch (InterruptedException e) {
LOG.warn("Signalling thread was interrupted: " + e.getMessage());
}
- } while (!server.isServing());
+ } while (!thriftServer.isRunning());
startLock.lock();
try {
startedServing.set(true);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java
new file mode 100644
index 0000000000..e58bd5634b
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HmsThriftHttpServlet.java
@@ -0,0 +1,113 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Enumeration;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServlet;
+
+public class HmsThriftHttpServlet extends TServlet {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(HmsThriftHttpServlet.class);
+
+ private static final String X_USER = MetaStoreUtils.USER_NAME_HTTP_HEADER;
+
+ private final boolean isSecurityEnabled;
+
+ public HmsThriftHttpServlet(TProcessor processor,
+ TProtocolFactory inProtocolFactory, TProtocolFactory outProtocolFactory) {
+ super(processor, inProtocolFactory, outProtocolFactory);
+ // This should ideally be reveiving an instance of the Configuration which is used for the check
+ isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+ }
+
+ public HmsThriftHttpServlet(TProcessor processor,
+ TProtocolFactory protocolFactory) {
+ super(processor, protocolFactory);
+ isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+
+ Enumeration<String> headerNames = request.getHeaderNames();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Logging headers in request");
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+ LOG.debug("Header: [{}], Value: [{}]", headerName,
+ request.getHeader(headerName));
+ }
+ }
+ String userFromHeader = request.getHeader(X_USER);
+ if (userFromHeader == null || userFromHeader.isEmpty()) {
+ LOG.error("No user header: {} found", X_USER);
+ response.sendError(HttpServletResponse.SC_FORBIDDEN,
+ "Header: " + X_USER + " missing in the request");
+ return;
+ }
+
+ // TODO: These should ideally be in some kind of a Cache with Weak referencse.
+ // If HMS were to set up some kind of a session, this would go into the session by having
+ // this filter work with a custom Processor / or set the username into the session
+ // as is done for HS2.
+ // In case of HMS, it looks like each request is independent, and there is no session
+ // information, so the UGI needs to be set up in the Connection layer itself.
+ UserGroupInformation clientUgi;
+ // Temporary, and useless for now. Here only to allow this to work on an otherwise kerberized
+ // server.
+ if (isSecurityEnabled) {
+ LOG.info("Creating proxy user for: {}", userFromHeader);
+ clientUgi = UserGroupInformation.createProxyUser(userFromHeader, UserGroupInformation.getLoginUser());
+ } else {
+ LOG.info("Creating remote user for: {}", userFromHeader);
+ clientUgi = UserGroupInformation.createRemoteUser(userFromHeader);
+ }
+
+
+ PrivilegedExceptionAction<Void> action = new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ HmsThriftHttpServlet.super.doPost(request, response);
+ return null;
+ }
+ };
+
+ try {
+ clientUgi.doAs(action);
+ } catch (InterruptedException | RuntimeException e) {
+ LOG.error("Exception when executing http request as user: " + clientUgi.getUserName(),
+ e);
+ throw new ServletException(e);
+ }
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
similarity index 54%
copy from standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
copy to standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
index 415988dea3..6e48607d33 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveHttpMetaStore.java
@@ -15,50 +15,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hive.metastore;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.experimental.categories.Category;
-
@Category(MetastoreCheckinTest.class)
-public class TestRemoteHiveMetaStore extends TestHiveMetaStore {
- private static boolean isServerStarted = false;
- protected static int port;
+public class TestRemoteHiveHttpMetaStore extends TestRemoteHiveMetaStore {
- public TestRemoteHiveMetaStore() {
- super();
- isThriftClient = true;
- }
-
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- if (isServerStarted) {
- Assert.assertNotNull("Unable to connect to the MetaStore server", client);
- return;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(TestRemoteHiveHttpMetaStore.class);
- port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
- conf);
- System.out.println("Starting MetaStore Server on port " + port);
- isServerStarted = true;
-
- // This is default case with setugi off for both client and server
- client = createClient();
+ @Override
+ public void start() throws Exception {
+ MetastoreConf.setVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "http");
+ LOG.info("Attempting to start test remote metastore in http mode");
+ super.start();
+ LOG.info("Successfully started test remote metastore in http mode");
}
@Override
protected HiveMetaStoreClient createClient() throws Exception {
- MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" + port);
- MetastoreConf.setBoolVar(conf, ConfVars.EXECUTE_SET_UGI, false);
- return new HiveMetaStoreClient(conf);
+ MetastoreConf.setVar(conf, ConfVars.METASTORE_CLIENT_THRIFT_TRANSPORT_MODE, "http");
+ return super.createClient();
}
-}
\ No newline at end of file
+}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
index 415988dea3..9b07321529 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestRemoteHiveMetaStore.java
@@ -45,7 +45,10 @@ public class TestRemoteHiveMetaStore extends TestHiveMetaStore {
Assert.assertNotNull("Unable to connect to the MetaStore server", client);
return;
}
+ start();
+ }
+ protected void start() throws Exception {
port = MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
conf);
System.out.println("Starting MetaStore Server on port " + port);
diff --git a/standalone-metastore/pom.xml b/standalone-metastore/pom.xml
index 8b242263ce..48fbe91487 100644
--- a/standalone-metastore/pom.xml
+++ b/standalone-metastore/pom.xml
@@ -103,6 +103,7 @@
<spotbugs.version>4.0.3</spotbugs.version>
<caffeine.version>2.8.4</caffeine.version>
<slf4j.version>1.7.30</slf4j.version>
+ <httpcomponents.core.version>4.4.13</httpcomponents.core.version>
<!-- Thrift properties -->
<thrift.home>you-must-set-this-to-run-thrift</thrift.home>
<thrift.gen.dir>${basedir}/src/gen/thrift</thrift.gen.dir>
@@ -361,6 +362,11 @@
<scope>runtime</scope>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcomponents.core.version}</version>
+ </dependency>
<!-- test scope dependencies -->
<dependency>
<groupId>junit</groupId>