You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/09/16 21:40:52 UTC
svn commit: r1625363 [2/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
itests/hive-unit/src/test/java/org/apache/hive/beeline/
itests/hive-unit/src/test/java/org/apache/hive/jdbc/ jdbc/
jdbc/src/java/org/apache/hive/jdbc/ ql/src/java...
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep 16 19:40:51 2014
@@ -48,100 +48,94 @@ import org.eclipse.jetty.util.thread.Exe
public class ThriftHttpCLIService extends ThriftCLIService {
public ThriftHttpCLIService(CLIService cliService) {
- super(cliService, "ThriftHttpCLIService");
+ super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
+ /**
+ * Configure Jetty to serve http requests. Example of a client connection URL:
+ * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
+ * e.g. http://gateway:port/hive2/servlets/thrifths2/
+ */
@Override
public void run() {
try {
- // Configure Jetty to serve http requests
- // Example of a client connection URL: http://localhost:10000/servlets/thrifths2/
- // a gateway may cause actual target URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
-
+ // Verify config validity
verifyHttpConfiguration(hiveConf);
- String portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
- if (portString != null) {
- portNum = Integer.valueOf(portString);
- } else {
- portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
- }
-
- minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
- maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
-
- String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
-
+ // HTTP Server
httpServer = new org.eclipse.jetty.server.Server();
+
+ // Server thread pool
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
-
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
httpServer.setThreadPool(threadPool);
- SelectChannelConnector connector = new SelectChannelConnector();;
+ // Connector configs
+ SelectChannelConnector connector = new SelectChannelConnector();
boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
String schemeName = useSsl ? "https" : "http";
- String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
- // Set during the init phase of HiveServer2 if auth mode is kerberos
- // UGI for the hive/_HOST (kerberos) principal
- UserGroupInformation serviceUGI = cliService.getServiceUGI();
- // UGI for the http/_HOST (SPNego) principal
- UserGroupInformation httpUGI = cliService.getHttpUGI();
-
+ // Change connector if SSL is used
if (useSsl) {
String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
if (keyStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
- " Not configured for SSL connection");
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
}
SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword);
connector = new SslSelectChannelConnector(sslContextFactory);
}
-
connector.setPort(portNum);
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
-
- int maxIdleTime = (int) hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
+ int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+ TimeUnit.MILLISECONDS);
connector.setMaxIdleTime(maxIdleTime);
-
+
httpServer.addConnector(connector);
+ // Thrift configs
hiveAuthFactory = new HiveAuthFactory(hiveConf);
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TProcessor processor = processorFactory.getProcessor(null);
-
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ // Set during the init phase of HiveServer2 if auth mode is kerberos
+ // UGI for the hive/_HOST (kerberos) principal
+ UserGroupInformation serviceUGI = cliService.getServiceUGI();
+ // UGI for the http/_HOST (SPNego) principal
+ UserGroupInformation httpUGI = cliService.getHttpUGI();
+ String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
+ serviceUGI, httpUGI);
- TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory,
- authType, serviceUGI, httpUGI);
-
+ // Context handler
final ServletContextHandler context = new ServletContextHandler(
ServletContextHandler.SESSIONS);
context.setContextPath("/");
-
+ String httpPath = getHttpPath(hiveConf
+ .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer.setHandler(context);
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
// TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
+ // Finally, start the server
httpServer.start();
- String msg = "Started ThriftHttpCLIService in " + schemeName + " mode on port " + portNum +
- " path=" + httpPath +
- " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads";
+ String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ + maxWorkerThreads + " worker threads";
LOG.info(msg);
httpServer.join();
} catch (Throwable t) {
- LOG.error("Error: ", t);
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftHttpCLIService.class.getSimpleName(), t);
+ System.exit(-1);
}
}
@@ -191,7 +185,8 @@ public class ThriftHttpCLIService extend
// NONE in case of thrift mode uses SASL
LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " +
authType + ". SASL is not supported with http transport mode," +
- " so using equivalent of " + AuthTypes.NOSASL);
+ " so using equivalent of "
+ + AuthTypes.NOSASL);
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java Tue Sep 16 19:40:51 2014
@@ -18,6 +18,8 @@
package org.apache.hive.service.server;
+import java.nio.charset.Charset;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.LogUtils;
@@ -25,12 +27,21 @@ import org.apache.hadoop.hive.common.Log
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.ServiceException;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
/**
* HiveServer2.
@@ -41,9 +52,12 @@ public class HiveServer2 extends Composi
private CLIService cliService;
private ThriftCLIService thriftCLIService;
+ private String znodePath;
+ private ZooKeeper zooKeeperClient;
+ private boolean registeredWithZooKeeper = false;
public HiveServer2() {
- super("HiveServer2");
+ super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@@ -52,20 +66,129 @@ public class HiveServer2 extends Composi
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService();
addService(cliService);
+ if (isHTTPTransportMode(hiveConf)) {
+ thriftCLIService = new ThriftHttpCLIService(cliService);
+ } else {
+ thriftCLIService = new ThriftBinaryCLIService(cliService);
+ }
+ addService(thriftCLIService);
+ thriftCLIService.setHiveServer2(this);
+ super.init(hiveConf);
+ // Add a shutdown hook for catching SIGTERM & SIGINT
+ final HiveServer2 hiveServer2 = this;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ hiveServer2.stop();
+ }
+ });
+ }
+
+ public static boolean isHTTPTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
- if(transportMode == null) {
+ if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
- if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
- thriftCLIService = new ThriftHttpCLIService(cliService);
+ if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+ return true;
}
- else {
- thriftCLIService = new ThriftBinaryCLIService(cliService);
+ return false;
+ }
+
+ /**
+ * Adds a server instance to ZooKeeper as a znode.
+ *
+ * @param hiveConf
+ * @throws Exception
+ */
+ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+ int zooKeeperSessionTimeout =
+ hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ String instanceURI = getServerInstanceURI(hiveConf);
+ byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+ zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+ new ZooKeeperHiveHelper.DummyWatcher());
+
+ // Create the parent znodes recursively; ignore if the parent already exists
+ try {
+ ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
+ } catch (KeeperException e) {
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
+ throw (e);
+ }
}
+ // Create a znode under the rootNamespace parent for this instance of the server
+ // Znode name: server-host:port-versionInfo-sequence
+ try {
+ String znodePath =
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-"
+ + HiveVersionInfo.getVersion() + "-";
+ znodePath =
+ zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ setRegisteredWithZooKeeper(true);
+ // Set a watch on the znode
+ if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+ // No node exists, throw exception
+ throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
+ }
+ LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+ } catch (KeeperException e) {
+ LOG.fatal("Unable to create a znode for this server instance", e);
+ throw new Exception(e);
+ }
+ }
- addService(thriftCLIService);
- super.init(hiveConf);
+ /**
+ * The watcher class which sets the de-register flag when the znode corresponding to this server
+ * instance is deleted. Additionally, it shuts down the server if there are no more active client
+ * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
+ */
+ private class DeRegisterWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+ HiveServer2.this.setRegisteredWithZooKeeper(false);
+ // If there are no more active client sessions, stop the server
+ if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+ LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ HiveServer2.this.stop();
+ }
+ LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ + "The server will be shut down after the last client sesssion completes.");
+ }
+ }
+ }
+
+ private void removeServerInstanceFromZooKeeper() throws Exception {
+ setRegisteredWithZooKeeper(false);
+ zooKeeperClient.close();
+ LOG.info("Server instance removed from ZooKeeper.");
+ }
+
+ public boolean isRegisteredWithZooKeeper() {
+ return registeredWithZooKeeper;
+ }
+
+ private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) {
+ this.registeredWithZooKeeper = registeredWithZooKeeper;
+ }
+
+ private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+ if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+ }
+ return thriftCLIService.getServerAddress().getHostName() + ":"
+ + thriftCLIService.getPortNumber();
}
@Override
@@ -75,23 +198,32 @@ public class HiveServer2 extends Composi
@Override
public synchronized void stop() {
- super.stop();
- // there should already be an instance of the session pool manager.
- // if not, ignoring is fine while stopping the hive server.
+ LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
+ super.stop();
+ // Remove this server instance from ZooKeeper if dynamic service discovery is set
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ try {
+ removeServerInstanceFromZooKeeper();
+ } catch (Exception e) {
+ LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+ }
+ }
+ // There should already be an instance of the session pool manager.
+ // If not, ignoring is fine while stopping HiveServer2.
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
try {
TezSessionPoolManager.getInstance().stop();
} catch (Exception e) {
- LOG.error("Tez session pool manager stop had an error during stop of hive server");
- e.printStackTrace();
+ LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+ + "Shutting down HiveServer2 anyway.", e);
}
}
}
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
- while(true) {
+ while (true) {
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
@@ -99,6 +231,11 @@ public class HiveServer2 extends Composi
server = new HiveServer2();
server.init(hiveConf);
server.start();
+ // If we're supporting dynamic service discovery, we'll add the service uri for this
+ // HiveServer2 instance to Zookeeper as a znode.
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ server.addServerInstanceToZooKeeper(hiveConf);
+ }
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
sessionPool.setupPool(hiveConf);
@@ -106,19 +243,19 @@ public class HiveServer2 extends Composi
}
break;
} catch (Throwable throwable) {
- if(++attempts >= maxAttempts) {
+ if (++attempts >= maxAttempts) {
throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
} else {
- LOG.warn("Error starting HiveServer2 on attempt " + attempts +
- ", will retry in 60 seconds", throwable);
+ LOG.warn("Error starting HiveServer2 on attempt " + attempts
+ + ", will retry in 60 seconds", throwable);
try {
if (server != null) {
server.stop();
server = null;
}
} catch (Exception e) {
- LOG.info("Exception caught when calling stop of HiveServer2 before" +
- " retrying start", e);
+ LOG.info(
+ "Exception caught when calling stop of HiveServer2 before" + " retrying start", e);
}
try {
Thread.sleep(60L * 1000L);
@@ -139,14 +276,15 @@ public class HiveServer2 extends Composi
System.exit(-1);
}
- //NOTE: It is critical to do this here so that log4j is reinitialized
+ // NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
String initLog4jMessage = LogUtils.initHiveLog4j();
LOG.debug(initLog4jMessage);
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
- //log debug message from "oproc" after log4j initialize properly
+ // log debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
+
startHiveServer2();
} catch (LogInitializationException e) {
LOG.error("Error initializing log: " + e.getMessage(), e);
@@ -156,6 +294,5 @@ public class HiveServer2 extends Composi
System.exit(-1);
}
}
-
}
Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Tue Sep 16 19:40:51 2014
@@ -27,7 +27,11 @@ import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
import org.junit.After;
@@ -83,7 +87,7 @@ public class TestSessionGlobalInitFile e
// set up service and client
HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION,
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION,
initFile.getParentFile().getAbsolutePath());
service = new FakeEmbeddedThriftBinaryCLIService(hiveConf);
service.init(new HiveConf());