You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/09/16 21:40:52 UTC

svn commit: r1625363 [2/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ itests/hive-unit/src/test/java/org/apache/hive/beeline/ itests/hive-unit/src/test/java/org/apache/hive/jdbc/ jdbc/ jdbc/src/java/org/apache/hive/jdbc/ ql/src/java...

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep 16 19:40:51 2014
@@ -48,100 +48,94 @@ import org.eclipse.jetty.util.thread.Exe
 public class ThriftHttpCLIService extends ThriftCLIService {
 
   public ThriftHttpCLIService(CLIService cliService) {
-    super(cliService, "ThriftHttpCLIService");
+    super(cliService, ThriftHttpCLIService.class.getSimpleName());
   }
 
+  /**
+   * Configure Jetty to serve http requests. Example of a client connection URL:
+   * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
+   * e.g. http://gateway:port/hive2/servlets/thrifths2/
+   */
   @Override
   public void run() {
     try {
-      // Configure Jetty to serve http requests
-      // Example of a client connection URL: http://localhost:10000/servlets/thrifths2/
-      // a gateway may cause actual target URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
-
+      // Verify config validity
       verifyHttpConfiguration(hiveConf);
 
-      String portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
-      if (portString != null) {
-        portNum = Integer.valueOf(portString);
-      } else {
-        portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
-      }
-
-      minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
-      maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
-      workerKeepAliveTime = hiveConf.getTimeVar(
-          ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
-
-      String httpPath =  getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
-
+      // HTTP Server
       httpServer = new org.eclipse.jetty.server.Server();
+
+      // Server thread pool
       String threadPoolName = "HiveServer2-HttpHandler-Pool";
       ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
           workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
           new ThreadFactoryWithGarbageCleanup(threadPoolName));
-
       ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
       httpServer.setThreadPool(threadPool);
 
-      SelectChannelConnector connector = new SelectChannelConnector();;
+      // Connector configs
+      SelectChannelConnector connector = new SelectChannelConnector();
       boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
       String schemeName = useSsl ? "https" : "http";
-      String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
-      // Set during the init phase of HiveServer2 if auth mode is kerberos
-      // UGI for the hive/_HOST (kerberos) principal
-      UserGroupInformation serviceUGI = cliService.getServiceUGI();
-      // UGI for the http/_HOST (SPNego) principal
-      UserGroupInformation httpUGI = cliService.getHttpUGI();
-
+      // Change connector if SSL is used
       if (useSsl) {
         String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
         String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
             HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
         if (keyStorePath.isEmpty()) {
-          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
-              " Not configured for SSL connection");
+          throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+              + " Not configured for SSL connection");
         }
         SslContextFactory sslContextFactory = new SslContextFactory();
         sslContextFactory.setKeyStorePath(keyStorePath);
         sslContextFactory.setKeyStorePassword(keyStorePassword);
         connector = new SslSelectChannelConnector(sslContextFactory);
       }
-      
       connector.setPort(portNum);
       // Linux:yes, Windows:no
       connector.setReuseAddress(!Shell.WINDOWS);
-      
-      int maxIdleTime = (int) hiveConf.getTimeVar(
-          ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
+      int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+          TimeUnit.MILLISECONDS);
       connector.setMaxIdleTime(maxIdleTime);
-      
+
       httpServer.addConnector(connector);
 
+      // Thrift configs
       hiveAuthFactory = new HiveAuthFactory(hiveConf);
       TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
       TProcessor processor = processorFactory.getProcessor(null);
-
       TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+      // Set during the init phase of HiveServer2 if auth mode is kerberos
+      // UGI for the hive/_HOST (kerberos) principal
+      UserGroupInformation serviceUGI = cliService.getServiceUGI();
+      // UGI for the http/_HOST (SPNego) principal
+      UserGroupInformation httpUGI = cliService.getHttpUGI();
+      String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
+          serviceUGI, httpUGI);
 
-      TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory,
-          authType, serviceUGI, httpUGI);
-
+      // Context handler
       final ServletContextHandler context = new ServletContextHandler(
           ServletContextHandler.SESSIONS);
       context.setContextPath("/");
-
+      String httpPath = getHttpPath(hiveConf
+          .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
       httpServer.setHandler(context);
       context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
 
       // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
+      // Finally, start the server
       httpServer.start();
-      String msg = "Started ThriftHttpCLIService in " + schemeName + " mode on port " + portNum +
-          " path=" + httpPath +
-          " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads";
+      String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+          + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+          + maxWorkerThreads + " worker threads";
       LOG.info(msg);
       httpServer.join();
     } catch (Throwable t) {
-      LOG.error("Error: ", t);
+      LOG.fatal(
+          "Error starting HiveServer2: could not start "
+              + ThriftHttpCLIService.class.getSimpleName(), t);
+      System.exit(-1);
     }
   }
 
@@ -191,7 +185,8 @@ public class ThriftHttpCLIService extend
       // NONE in case of thrift mode uses SASL
       LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " +
           authType + ". SASL is not supported with http transport mode," +
-          " so using equivalent of " + AuthTypes.NOSASL);
+ " so using equivalent of "
+          + AuthTypes.NOSASL);
     }
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/HiveServer2.java Tue Sep 16 19:40:51 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hive.service.server;
 
+import java.nio.charset.Charset;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.LogUtils;
@@ -25,12 +27,21 @@ import org.apache.hadoop.hive.common.Log
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.cli.CLIService;
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
  * HiveServer2.
@@ -41,9 +52,12 @@ public class HiveServer2 extends Composi
 
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
+  private String znodePath;
+  private ZooKeeper zooKeeperClient;
+  private boolean registeredWithZooKeeper = false;
 
   public HiveServer2() {
-    super("HiveServer2");
+    super(HiveServer2.class.getSimpleName());
     HiveConf.setLoadHiveServer2Config(true);
   }
 
@@ -52,20 +66,129 @@ public class HiveServer2 extends Composi
   public synchronized void init(HiveConf hiveConf) {
     cliService = new CLIService();
     addService(cliService);
+    if (isHTTPTransportMode(hiveConf)) {
+      thriftCLIService = new ThriftHttpCLIService(cliService);
+    } else {
+      thriftCLIService = new ThriftBinaryCLIService(cliService);
+    }
+    addService(thriftCLIService);
+    thriftCLIService.setHiveServer2(this);
+    super.init(hiveConf);
 
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    final HiveServer2 hiveServer2 = this;
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    });
+  }
+
+  public static boolean isHTTPTransportMode(HiveConf hiveConf) {
     String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
-    if(transportMode == null) {
+    if (transportMode == null) {
       transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
     }
-    if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
-      thriftCLIService = new ThriftHttpCLIService(cliService);
+    if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+      return true;
     }
-    else {
-      thriftCLIService = new ThriftBinaryCLIService(cliService);
+    return false;
+  }
+
+  /**
+   * Adds a server instance to ZooKeeper as a znode.
+   *
+   * @param hiveConf
+   * @throws Exception
+   */
+  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+    int zooKeeperSessionTimeout =
+        hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    String instanceURI = getServerInstanceURI(hiveConf);
+    byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+    zooKeeperClient =
+        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+            new ZooKeeperHiveHelper.DummyWatcher());
+
+    // Create the parent znodes recursively; ignore if the parent already exists
+    try {
+      ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace,
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
+    } catch (KeeperException e) {
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
+        throw (e);
+      }
     }
+    // Create a znode under the rootNamespace parent for this instance of the server
+    // Znode name: server-host:port-versionInfo-sequence
+    try {
+      String znodePath =
+          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-"
+              + HiveVersionInfo.getVersion() + "-";
+      znodePath =
+          zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE,
+              CreateMode.EPHEMERAL_SEQUENTIAL);
+      setRegisteredWithZooKeeper(true);
+      // Set a watch on the znode
+      if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+        // No node exists, throw exception
+        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
+      }
+      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+    } catch (KeeperException e) {
+      LOG.fatal("Unable to create a znode for this server instance", e);
+      throw new Exception(e);
+    }
+  }
 
-    addService(thriftCLIService);
-    super.init(hiveConf);
+  /**
+   * The watcher class which sets the de-register flag when the znode corresponding to this server
+   * instance is deleted. Additionally, it shuts down the server if there are no more active client
+   * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
+   */
+  private class DeRegisterWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+        HiveServer2.this.setRegisteredWithZooKeeper(false);
+        // If there are no more active client sessions, stop the server
+        if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+          LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+              + "instances available for dynamic service discovery. "
+              + "The last client session has ended - will shutdown now.");
+          HiveServer2.this.stop();
+        }
+        LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+            + "The server will be shut down after the last client sesssion completes.");
+      }
+    }
+  }
+
+  private void removeServerInstanceFromZooKeeper() throws Exception {
+    setRegisteredWithZooKeeper(false);
+    zooKeeperClient.close();
+    LOG.info("Server instance removed from ZooKeeper.");
+  }
+
+  public boolean isRegisteredWithZooKeeper() {
+    return registeredWithZooKeeper;
+  }
+
+  private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) {
+    this.registeredWithZooKeeper = registeredWithZooKeeper;
+  }
+
+  private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+    if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
+      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+    }
+    return thriftCLIService.getServerAddress().getHostName() + ":"
+        + thriftCLIService.getPortNumber();
   }
 
   @Override
@@ -75,23 +198,32 @@ public class HiveServer2 extends Composi
 
   @Override
   public synchronized void stop() {
-    super.stop();
-    // there should already be an instance of the session pool manager.
-    // if not, ignoring is fine while stopping the hive server.
+    LOG.info("Shutting down HiveServer2");
     HiveConf hiveConf = this.getHiveConf();
+    super.stop();
+    // Remove this server instance from ZooKeeper if dynamic service discovery is set
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+      try {
+        removeServerInstanceFromZooKeeper();
+      } catch (Exception e) {
+        LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+      }
+    }
+    // There should already be an instance of the session pool manager.
+    // If not, ignoring is fine while stopping HiveServer2.
     if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
       try {
         TezSessionPoolManager.getInstance().stop();
       } catch (Exception e) {
-        LOG.error("Tez session pool manager stop had an error during stop of hive server");
-        e.printStackTrace();
+        LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+            + "Shutting down HiveServer2 anyway.", e);
       }
     }
   }
 
   private static void startHiveServer2() throws Throwable {
     long attempts = 0, maxAttempts = 1;
-    while(true) {
+    while (true) {
       HiveConf hiveConf = new HiveConf();
       maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
       HiveServer2 server = null;
@@ -99,6 +231,11 @@ public class HiveServer2 extends Composi
         server = new HiveServer2();
         server.init(hiveConf);
         server.start();
+        // If we're supporting dynamic service discovery, we'll add the service uri for this
+        // HiveServer2 instance to Zookeeper as a znode.
+        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+          server.addServerInstanceToZooKeeper(hiveConf);
+        }
         if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
           TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
           sessionPool.setupPool(hiveConf);
@@ -106,19 +243,19 @@ public class HiveServer2 extends Composi
         }
         break;
       } catch (Throwable throwable) {
-        if(++attempts >= maxAttempts) {
+        if (++attempts >= maxAttempts) {
           throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
         } else {
-          LOG.warn("Error starting HiveServer2 on attempt " + attempts +
-            ", will retry in 60 seconds", throwable);
+          LOG.warn("Error starting HiveServer2 on attempt " + attempts
+              + ", will retry in 60 seconds", throwable);
           try {
             if (server != null) {
               server.stop();
               server = null;
             }
           } catch (Exception e) {
-            LOG.info("Exception caught when calling stop of HiveServer2 before" +
-              " retrying start", e);
+            LOG.info(
+                "Exception caught when calling stop of HiveServer2 before" + " retrying start", e);
           }
           try {
             Thread.sleep(60L * 1000L);
@@ -139,14 +276,15 @@ public class HiveServer2 extends Composi
         System.exit(-1);
       }
 
-      //NOTE: It is critical to do this here so that log4j is reinitialized
+      // NOTE: It is critical to do this here so that log4j is reinitialized
       // before any of the other core hive classes are loaded
       String initLog4jMessage = LogUtils.initHiveLog4j();
       LOG.debug(initLog4jMessage);
 
       HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
-      //log debug message from "oproc" after log4j initialize properly
+      // log debug message from "oproc" after log4j initialize properly
       LOG.debug(oproc.getDebugMessage().toString());
+
       startHiveServer2();
     } catch (LogInitializationException e) {
       LOG.error("Error initializing log: " + e.getMessage(), e);
@@ -156,6 +294,5 @@ public class HiveServer2 extends Composi
       System.exit(-1);
     }
   }
-
 }
 

Modified: hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1625363&r1=1625362&r2=1625363&view=diff
==============================================================================
--- hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original)
+++ hive/trunk/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Tue Sep 16 19:40:51 2014
@@ -27,7 +27,11 @@ import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
 import org.junit.After;
@@ -83,7 +87,7 @@ public class TestSessionGlobalInitFile e
 
     // set up service and client
     HiveConf hiveConf = new HiveConf();
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION,
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION,
         initFile.getParentFile().getAbsolutePath());
     service = new FakeEmbeddedThriftBinaryCLIService(hiveConf);
     service.init(new HiveConf());