You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2015/09/10 06:52:45 UTC
hive git commit: HIVE-11482 : Adds retrying thrift client for
HiveServer2 (Akshay Goyal, reviewed by Amareshwari)
Repository: hive
Updated Branches:
refs/heads/master d94c0f65d -> 9b11caff8
HIVE-11482 : Adds retrying thrift client for HiveServer2 (Akshay Goyal, reviewed by Amareshwari)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9b11caff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9b11caff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9b11caff
Branch: refs/heads/master
Commit: 9b11caff8b61697c88caa1ed5606c665624f3290
Parents: d94c0f6
Author: Akshay Goyal <ak...@gmail.com>
Authored: Thu Sep 10 10:22:31 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Thu Sep 10 10:22:31 2015 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 +
.../thrift/RetryingThriftCLIServiceClient.java | 331 +++++++++++++++++++
.../cli/TestRetryingThriftCLIServiceClient.java | 133 ++++++++
3 files changed, 475 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8a00079..d2c5885 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2022,6 +2022,17 @@ public class HiveConf extends Configuration {
"Session will be considered to be idle only if there is no activity, and there is no pending operation.\n" +
" This setting takes effect only if session idle timeout (hive.server2.idle.session.timeout) and checking\n" +
"(hive.server2.session.check.interval) are enabled."),
+ HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT("hive.server2.thrift.client.retry.limit", 1,"Number of retries upon " +
+ "failure of Thrift HiveServer2 calls"),
+ HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT("hive.server2.thrift.client.connect.retry.limit", 1,"Number of " +
+ "retries while opening a connection to HiveServe2"),
+ HIVE_SERVER2_THRIFT_CLIENT_RETRY_DELAY_SECONDS("hive.server2.thrift.client.retry.delay.seconds", "1s",
+ new TimeValidator(TimeUnit.SECONDS), "Number of seconds for the HiveServer2 thrift client to wait between " +
+ "consecutive connection attempts. Also specifies the time to wait between retrying thrift calls upon failures"),
+ HIVE_SERVER2_THRIFT_CLIENT_USER("hive.server2.thrift.client.user", "anonymous","Username to use against thrift" +
+ " client"),
+ HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " +
+ "thrift client"),
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
"Comma separated list of non-SQL Hive commands users are authorized to execute"),
http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
new file mode 100644
index 0000000..4bd7336
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli.thrift;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.auth.KerberosSaslHelper;
+import org.apache.hive.service.auth.PlainSaslHelper;
+import org.apache.hive.service.cli.*;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import javax.security.sasl.SaslException;
+import java.lang.reflect.*;
+import java.net.SocketException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * RetryingThriftCLIServiceClient. Creates a proxy for a CLIServiceClient
+ * implementation and retries calls to it on failure.
+ */
+public class RetryingThriftCLIServiceClient implements InvocationHandler {
+ public static final Log LOG = LogFactory.getLog(RetryingThriftCLIServiceClient.class);
+ private ThriftCLIServiceClient base;
+ private final int retryLimit;
+ private final int retryDelaySeconds;
+ private HiveConf conf;
+ private TTransport transport;
+
+ public static class CLIServiceClientWrapper extends CLIServiceClient {
+ private final ICLIService cliService;
+
+ public CLIServiceClientWrapper(ICLIService icliService) {
+ cliService = icliService;
+ }
+
+ @Override
+ public SessionHandle openSession(String username, String password) throws HiveSQLException {
+ return cliService.openSession(username, password, Collections.<String, String>emptyMap());
+ }
+
+ @Override
+ public String getDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory, String owner,
+ String renewer) throws HiveSQLException {
+ return cliService.getDelegationToken(sessionHandle, authFactory, owner, renewer);
+ }
+
+ @Override
+ public void cancelDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ cliService.cancelDelegationToken(sessionHandle, authFactory, tokenStr);
+ }
+
+ @Override
+ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory authFactory,
+ String tokenStr) throws HiveSQLException {
+ cliService.renewDelegationToken(sessionHandle, authFactory, tokenStr);
+ }
+
+ @Override
+ public SessionHandle openSession(String username, String password, Map<String, String> configuration)
+ throws HiveSQLException {
+ return cliService.openSession(username, password, configuration);
+ }
+
+ @Override
+ public SessionHandle openSessionWithImpersonation(String username,
+ String password,
+ Map<String, String> configuration,
+ String delegationToken) throws HiveSQLException {
+ return cliService.openSessionWithImpersonation(username, password, configuration, delegationToken);
+ }
+
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws HiveSQLException {
+ cliService.closeSession(sessionHandle);
+ }
+
+ @Override
+ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType) throws HiveSQLException {
+ return cliService.getInfo(sessionHandle, getInfoType);
+ }
+
+ @Override
+ public OperationHandle executeStatement(SessionHandle sessionHandle,
+ String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
+ return cliService.executeStatement(sessionHandle, statement, confOverlay);
+ }
+
+ @Override
+ public OperationHandle executeStatementAsync(SessionHandle sessionHandle,
+ String statement,
+ Map<String, String> confOverlay) throws HiveSQLException {
+ return cliService.executeStatementAsync(sessionHandle, statement, confOverlay);
+ }
+
+ @Override
+ public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException {
+ return cliService.getTypeInfo(sessionHandle);
+ }
+
+ @Override
+ public OperationHandle getCatalogs(SessionHandle sessionHandle) throws HiveSQLException {
+ return cliService.getCatalogs(sessionHandle);
+ }
+
+ @Override
+ public OperationHandle getSchemas(SessionHandle sessionHandle, String catalogName, String schemaName)
+ throws HiveSQLException {
+ return cliService.getSchemas(sessionHandle, catalogName, schemaName);
+ }
+
+ @Override
+ public OperationHandle getTables(SessionHandle sessionHandle, String catalogName, String schemaName,
+ String tableName, List<String> tableTypes) throws HiveSQLException {
+ return cliService.getTables(sessionHandle, catalogName, schemaName, tableName, tableTypes);
+ }
+
+ @Override
+ public OperationHandle getTableTypes(SessionHandle sessionHandle) throws HiveSQLException {
+ return null;
+ }
+
+ @Override
+ public OperationHandle getColumns(SessionHandle sessionHandle, String catalogName, String schemaName,
+ String tableName, String columnName) throws HiveSQLException {
+ return cliService.getColumns(sessionHandle, catalogName, schemaName, tableName, columnName);
+ }
+
+ @Override
+ public OperationHandle getFunctions(SessionHandle sessionHandle, String catalogName, String schemaName,
+ String functionName) throws HiveSQLException {
+ return cliService.getFunctions(sessionHandle, catalogName, schemaName, functionName);
+ }
+
+ @Override
+ public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException {
+ return cliService.getOperationStatus(opHandle);
+ }
+
+ @Override
+ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
+ cliService.cancelOperation(opHandle);
+ }
+
+ @Override
+ public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
+ cliService.closeOperation(opHandle);
+ }
+
+ @Override
+ public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
+ return cliService.getResultSetMetadata(opHandle);
+ }
+
+ @Override
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+ FetchType fetchType) throws HiveSQLException {
+ return cliService.fetchResults(opHandle, orientation, maxRows, fetchType);
+ }
+ }
+
+ protected RetryingThriftCLIServiceClient(HiveConf conf) {
+ this.conf = conf;
+ retryLimit = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT);
+ retryDelaySeconds = (int) conf.getTimeVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_DELAY_SECONDS,
+ TimeUnit.SECONDS);
+ }
+
+ public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException {
+ RetryingThriftCLIServiceClient retryClient = new RetryingThriftCLIServiceClient(conf);
+ retryClient.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT));
+ ICLIService cliService =
+ (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClient.class.getClassLoader(),
+ CLIServiceClient.class.getInterfaces(), retryClient);
+ return new CLIServiceClientWrapper(cliService);
+ }
+
+ protected void connectWithRetry(int retries) throws HiveSQLException {
+ for (int i = 0 ; i < retries; i++) {
+ try {
+ connect(conf);
+ break;
+ } catch (TTransportException e) {
+ if (i + 1 == retries) {
+ throw new HiveSQLException("Unable to connect after " + retries + " retries", e);
+ }
+ LOG.warn("Connection attempt " + i, e);
+ }
+ try {
+ Thread.sleep(retryDelaySeconds * 1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ }
+ }
+ }
+
+ protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException, TTransportException {
+ if (transport != null && transport.isOpen()) {
+ transport.close();
+ }
+
+ String host = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ int port = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ LOG.info("Connecting to " + host + ":" + port);
+
+ transport = new TSocket(host, port);
+ ((TSocket) transport).setTimeout((int) conf.getTimeVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT,
+ TimeUnit.SECONDS) * 1000);
+ try {
+ ((TSocket) transport).getSocket().setKeepAlive(conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE));
+ } catch (SocketException e) {
+ LOG.error("Error setting keep alive to " + conf.getBoolVar(HiveConf.ConfVars.SERVER_TCP_KEEP_ALIVE), e);
+ }
+
+ String userName = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
+ String passwd = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_PASSWORD);
+
+ try {
+ transport = PlainSaslHelper.getPlainTransport(userName, passwd, transport);
+ } catch (SaslException e) {
+ LOG.error("Error creating plain SASL transport", e);
+ }
+
+ TProtocol protocol = new TBinaryProtocol(transport);
+ transport.open();
+ base = new ThriftCLIServiceClient(new TCLIService.Client(protocol));
+ LOG.info("Connected!");
+ return transport;
+ }
+
+ protected class InvocationResult {
+ final boolean success;
+ final Object result;
+ final Throwable exception;
+
+ InvocationResult(boolean success, Object result, Throwable exception) {
+ this.success = success;
+ this.result = result;
+ this.exception = exception;
+ }
+ }
+
+ protected InvocationResult invokeInternal(Method method, Object[] args) throws Throwable {
+ InvocationResult result;
+ try {
+ Object methodResult = method.invoke(base, args);
+ result = new InvocationResult(true, methodResult, null);
+ } catch (UndeclaredThrowableException e) {
+ throw e.getCause();
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof HiveSQLException) {
+ HiveSQLException hiveExc = (HiveSQLException) e.getCause();
+ Throwable cause = hiveExc.getCause();
+ if ((cause instanceof TApplicationException) ||
+ (cause instanceof TProtocolException) ||
+ (cause instanceof TTransportException)) {
+ result = new InvocationResult(false, null, hiveExc);
+ } else {
+ throw hiveExc;
+ }
+ } else {
+ throw e.getCause();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+ int attempts = 0;
+
+ while (true) {
+ attempts++;
+ InvocationResult invokeResult = invokeInternal(method, args);
+ if (invokeResult.success) {
+ return invokeResult.result;
+ }
+
+ // Error because of thrift client, we have to recreate base object
+ connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT));
+
+ if (attempts >= retryLimit) {
+ LOG.error(method.getName() + " failed after " + attempts + " retries.", invokeResult.exception);
+ throw invokeResult.exception;
+ }
+
+ LOG.warn("Last call ThriftCLIServiceClient." + method.getName() + " failed, attempts = " + attempts,
+ invokeResult.exception);
+ Thread.sleep(retryDelaySeconds * 1000);
+ }
+ }
+
+ public int getRetryLimit() {
+ return retryLimit;
+ }
+
+ public int getRetryDelaySeconds() {
+ return retryDelaySeconds;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b11caff/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
new file mode 100644
index 0000000..3798053
--- /dev/null
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient;
+import org.apache.hive.service.cli.thrift.ThriftCLIService;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test CLI service with a retrying client. All tests should pass. This is to validate that calls
+ * are transferred successfully.
+ */
+public class TestRetryingThriftCLIServiceClient {
+ protected static ThriftCLIService service;
+
+ static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient {
+ int callCount = 0;
+ int connectCount = 0;
+ static RetryingThriftCLIServiceClientTest handlerInst;
+
+ protected RetryingThriftCLIServiceClientTest(HiveConf conf) {
+ super(conf);
+ }
+
+ public static CLIServiceClient newRetryingCLIServiceClient(HiveConf conf) throws HiveSQLException {
+ handlerInst = new RetryingThriftCLIServiceClientTest(conf);
+ handlerInst.connectWithRetry(conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT));
+
+ ICLIService cliService =
+ (ICLIService) Proxy.newProxyInstance(RetryingThriftCLIServiceClientTest.class.getClassLoader(),
+ CLIServiceClient.class.getInterfaces(), handlerInst);
+ return new CLIServiceClientWrapper(cliService);
+ }
+
+ @Override
+ protected InvocationResult invokeInternal(Method method, Object[] args) throws Throwable {
+ System.out.println("## Calling: " + method.getName() + ", " + callCount + "/" + getRetryLimit());
+ callCount++;
+ return super.invokeInternal(method, args);
+ }
+
+ @Override
+ protected synchronized TTransport connect(HiveConf conf) throws HiveSQLException, TTransportException {
+ connectCount++;
+ return super.connect(conf);
+ }
+ }
+ @Test
+ public void testRetryBehaviour() throws Exception {
+ // Start hive server2
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost");
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary");
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3);
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s");
+
+ final HiveServer2 server = new HiveServer2();
+ server.init(hiveConf);
+ server.start();
+ Thread.sleep(5000);
+ System.out.println("## HiveServer started");
+
+ // Check if giving invalid address causes retry in connection attempt
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000);
+ try {
+ CLIServiceClient cliServiceClient =
+ RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ fail("Expected to throw exception for invalid port");
+ } catch (HiveSQLException sqlExc) {
+ assertTrue(sqlExc.getCause() instanceof TTransportException);
+ assertTrue(sqlExc.getMessage().contains("3"));
+ }
+
+ // Reset port setting
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
+ // Create client
+ CLIServiceClient cliServiceClient =
+ RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ System.out.println("## Created client");
+
+ // kill server
+ server.stop();
+ Thread.sleep(5000);
+
+ // submit few queries
+ try {
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0;
+ RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0;
+ SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous");
+ } catch (HiveSQLException exc) {
+ exc.printStackTrace();
+ assertTrue(exc.getCause() instanceof TException);
+ assertEquals(1, RetryingThriftCLIServiceClientTest.handlerInst.callCount);
+ assertEquals(3, RetryingThriftCLIServiceClientTest.handlerInst.connectCount);
+ }
+
+ }
+}