You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2016/04/15 13:46:39 UTC
hive git commit: HIVE-13415 : Decouple Sessions from thrift binary
transport (Rajat Khandelwal, reviewed by Szehon Ho)
Repository: hive
Updated Branches:
refs/heads/master 3fec161da -> b30fe72e0
HIVE-13415 : Decouple Sessions from thrift binary transport (Rajat Khandelwal, reviewed by Szehon Ho)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b30fe72e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b30fe72e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b30fe72e
Branch: refs/heads/master
Commit: b30fe72e021a4adb83436c4f19b0f400d1f44edf
Parents: 3fec161
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Fri Apr 15 17:16:26 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Fri Apr 15 17:16:26 2016 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../cli/thrift/ThriftBinaryCLIService.java | 64 ++++++++-
.../service/cli/thrift/ThriftCLIService.java | 57 --------
.../cli/TestRetryingThriftCLIServiceClient.java | 130 +++++++++++++++----
4 files changed, 171 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c7e5b33..5cf1609 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2304,6 +2304,8 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
"The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."),
+ HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT("hive.server2.close.session.on.disconnect", true,
+ "Session will be closed when connection is closed. Set this to false to have session outlive its parent connection."),
HIVE_SERVER2_IDLE_SESSION_TIMEOUT("hive.server2.idle.session.timeout", "7d",
new TimeValidator(TimeUnit.MILLISECONDS),
"Session will be closed when it's not accessed for this duration, which can be disabled by setting to zero or negative value."),
http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index cf575a4..d9c7b2e 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -24,16 +24,25 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
@@ -94,7 +103,60 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
// TCP Server
server = new TThreadPoolServer(sargs);
- server.setServerEventHandler(serverEventHandler);
+ server.setServerEventHandler(new TServerEventHandler() {
+ @Override
+ public ServerContext createContext(
+ TProtocol input, TProtocol output) {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
+ return new ThriftCLIServerContext();
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext,
+ TProtocol input, TProtocol output) {
+ Metrics metrics = MetricsFactory.getInstance();
+ if (metrics != null) {
+ try {
+ metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
+ } catch (Exception e) {
+ LOG.warn("Error Reporting JDO operation to Metrics system", e);
+ }
+ }
+ ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
+ SessionHandle sessionHandle = context.getSessionHandle();
+ if (sessionHandle != null) {
+ LOG.info("Session disconnected without closing properly. ");
+ try {
+ boolean close = cliService.getSessionManager().getSession(sessionHandle).getHiveConf()
+ .getBoolVar(ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT);
+ LOG.info((close ? "" : "Not ") + "Closing the session: " + sessionHandle);
+ if (close) {
+ cliService.closeSession(sessionHandle);
+ }
+ } catch (HiveSQLException e) {
+ LOG.warn("Failed to close session: " + e, e);
+ }
+ }
+ }
+
+ @Override
+ public void preServe() {
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext,
+ TTransport input, TTransport output) {
+ currentServerContext.set(serverContext);
+ }
+ });
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index be9833d..e789a38 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -28,9 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.security.auth.login.LoginException;
-import org.apache.hadoop.hive.common.metrics.common.Metrics;
-import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
-import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.common.ServerUtils;
@@ -38,7 +35,6 @@ import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.ServiceException;
-import org.apache.hive.service.ServiceUtils;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.TSetIpAddressProcessor;
import org.apache.hive.service.cli.CLIService;
@@ -97,11 +93,8 @@ import org.apache.hive.service.rpc.thrift.TStatus;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
-import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +127,6 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
protected int maxWorkerThreads;
protected long workerKeepAliveTime;
- protected TServerEventHandler serverEventHandler;
protected ThreadLocal<ServerContext> currentServerContext;
static class ThriftCLIServerContext implements ServerContext {
@@ -153,55 +145,6 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
super(serviceName);
this.cliService = service;
currentServerContext = new ThreadLocal<ServerContext>();
- serverEventHandler = new TServerEventHandler() {
- @Override
- public ServerContext createContext(
- TProtocol input, TProtocol output) {
- Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null) {
- try {
- metrics.incrementCounter(MetricsConstant.OPEN_CONNECTIONS);
- metrics.incrementCounter(MetricsConstant.CUMULATIVE_CONNECTION_COUNT);
- } catch (Exception e) {
- LOG.warn("Error Reporting JDO operation to Metrics system", e);
- }
- }
- return new ThriftCLIServerContext();
- }
-
- @Override
- public void deleteContext(ServerContext serverContext,
- TProtocol input, TProtocol output) {
- Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null) {
- try {
- metrics.decrementCounter(MetricsConstant.OPEN_CONNECTIONS);
- } catch (Exception e) {
- LOG.warn("Error Reporting JDO operation to Metrics system", e);
- }
- }
- ThriftCLIServerContext context = (ThriftCLIServerContext) serverContext;
- SessionHandle sessionHandle = context.getSessionHandle();
- if (sessionHandle != null) {
- LOG.info("Session disconnected without closing properly, close it now");
- try {
- cliService.closeSession(sessionHandle);
- } catch (HiveSQLException e) {
- LOG.warn("Failed to close session: " + e, e);
- }
- }
- }
-
- @Override
- public void preServe() {
- }
-
- @Override
- public void processContext(ServerContext serverContext,
- TTransport input, TTransport output) {
- currentServerContext.set(serverContext);
- }
- };
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/b30fe72e/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
index 3bd82e6..d36f6c0 100644
--- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
+++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java
@@ -19,13 +19,17 @@
package org.apache.hive.service.cli;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.Service;
import org.apache.hive.service.auth.HiveAuthFactory;
+import org.apache.hive.service.cli.session.HiveSession;
import org.apache.hive.service.cli.thrift.RetryingThriftCLIServiceClient;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+
+import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Method;
@@ -41,6 +45,38 @@ import static org.junit.Assert.*;
*/
public class TestRetryingThriftCLIServiceClient {
protected static ThriftCLIService service;
+ private HiveConf hiveConf;
+ private HiveServer2 server;
+
+ @Before
+ public void init() {
+ hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost");
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString());
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary");
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3);
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s");
+ }
+
+ private void startHiveServer() throws InterruptedException {
+ // Start hive server2
+ server = new HiveServer2();
+ server.init(hiveConf);
+ server.start();
+ Thread.sleep(5000);
+ System.out.println("## HiveServer started");
+ }
+
+ private void stopHiveServer() {
+ if (server != null) {
+ // kill server
+ server.stop();
+ }
+ }
static class RetryingThriftCLIServiceClientTest extends RetryingThriftCLIServiceClient {
int callCount = 0;
@@ -74,31 +110,14 @@ public class TestRetryingThriftCLIServiceClient {
return super.connect(conf);
}
}
+
@Test
public void testRetryBehaviour() throws Exception {
- // Start hive server2
- HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, "localhost");
- hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 15000);
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
- hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString());
- hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE, "binary");
- hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_RETRY_LIMIT, 3);
- hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3);
- hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS, 10);
- hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, "1s");
-
- final HiveServer2 server = new HiveServer2();
- server.init(hiveConf);
- server.start();
- Thread.sleep(5000);
- System.out.println("## HiveServer started");
-
+ startHiveServer();
// Check if giving invalid address causes retry in connection attempt
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, 17000);
try {
- CLIServiceClient cliServiceClient =
- RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
fail("Expected to throw exception for invalid port");
} catch (HiveSQLException sqlExc) {
assertTrue(sqlExc.getCause() instanceof TTransportException);
@@ -112,16 +131,14 @@ public class TestRetryingThriftCLIServiceClient {
= RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
System.out.println("## Created client");
- // kill server
- server.stop();
+ stopHiveServer();
Thread.sleep(5000);
// submit few queries
try {
- Map<String, String> confOverlay = new HashMap<String, String>();
RetryingThriftCLIServiceClientTest.handlerInst.callCount = 0;
RetryingThriftCLIServiceClientTest.handlerInst.connectCount = 0;
- SessionHandle session = cliServiceClient.openSession("anonymous", "anonymous");
+ cliServiceClient.openSession("anonymous", "anonymous");
} catch (HiveSQLException exc) {
exc.printStackTrace();
assertTrue(exc.getCause() instanceof TException);
@@ -131,4 +148,69 @@ public class TestRetryingThriftCLIServiceClient {
cliServiceClient.closeTransport();
}
}
+
+ @Test
+ public void testTransportClose() throws InterruptedException, HiveSQLException {
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 0);
+ try {
+ startHiveServer();
+ RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
+ = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ client.closeTransport();
+ try {
+ client.openSession("anonymous", "anonymous");
+ fail("Shouldn't be able to open session when transport is closed.");
+ } catch(HiveSQLException ignored) {
+
+ }
+ } finally {
+ hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_CONNECTION_RETRY_LIMIT, 3);
+ stopHiveServer();
+ }
+ }
+
+ @Test
+ public void testSessionLifeAfterTransportClose() throws InterruptedException, HiveSQLException {
+ try {
+ startHiveServer();
+ CLIService service = null;
+ for (Service s : server.getServices()) {
+ if (s instanceof CLIService) {
+ service = (CLIService) s;
+ }
+ }
+ if (service == null) {
+ service = new CLIService(server);
+ }
+ RetryingThriftCLIServiceClient.CLIServiceClientWrapper client
+ = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ Map<String, String> conf = new HashMap<>();
+ conf.put(HiveConf.ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT.varname, "false");
+ SessionHandle sessionHandle = client.openSession("anonymous", "anonymous", conf);
+ assertNotNull(sessionHandle);
+ HiveSession session = service.getSessionManager().getSession(sessionHandle);
+ OperationHandle op1 = session.executeStatementAsync("show databases", null);
+ assertNotNull(op1);
+ client.closeTransport();
+ // Verify that session wasn't closed on transport close.
+ assertEquals(session, service.getSessionManager().getSession(sessionHandle));
+ // Should be able to execute without failure in the session whose transport has been closed.
+ OperationHandle op2 = session.executeStatementAsync("show databases", null);
+ assertNotNull(op2);
+ // Make new client, since transport was closed for the last one.
+ client = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf);
+ client.closeSession(sessionHandle);
+ // operations will be lost once owning session is closed.
+ for (OperationHandle op: new OperationHandle[]{op1, op2}) {
+ try {
+ client.getOperationStatus(op);
+ fail("Should have failed.");
+ } catch (HiveSQLException ignored) {
+
+ }
+ }
+ } finally {
+ stopHiveServer();
+ }
+ }
}