You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/20 19:34:43 UTC

svn commit: r1626482 [6/6] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ common/src/java/org/apache/hadoop/hive/conf/ data/files/ hcatalog/hc...

Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Sat Sep 20 17:34:39 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hive.service.server;
 
+import java.nio.charset.Charset;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.LogUtils;
@@ -26,12 +28,21 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.cli.CLIService;
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
  * HiveServer2.
@@ -42,9 +53,12 @@ public class HiveServer2 extends Composi
 
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
+  private String znodePath;
+  private ZooKeeper zooKeeperClient;
+  private boolean registeredWithZooKeeper = false;
 
   public HiveServer2() {
-    super("HiveServer2");
+    super(HiveServer2.class.getSimpleName());
     HiveConf.setLoadHiveServer2Config(true);
   }
 
@@ -53,20 +67,129 @@ public class HiveServer2 extends Composi
   public synchronized void init(HiveConf hiveConf) {
     cliService = new CLIService();
     addService(cliService);
+    if (isHTTPTransportMode(hiveConf)) {
+      thriftCLIService = new ThriftHttpCLIService(cliService);
+    } else {
+      thriftCLIService = new ThriftBinaryCLIService(cliService);
+    }
+    addService(thriftCLIService);
+    thriftCLIService.setHiveServer2(this);
+    super.init(hiveConf);
 
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    final HiveServer2 hiveServer2 = this;
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        hiveServer2.stop();
+      }
+    });
+  }
+
+  public static boolean isHTTPTransportMode(HiveConf hiveConf) {
     String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
-    if(transportMode == null) {
+    if (transportMode == null) {
       transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
     }
-    if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
-      thriftCLIService = new ThriftHttpCLIService(cliService);
+    if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+      return true;
     }
-    else {
-      thriftCLIService = new ThriftBinaryCLIService(cliService);
+    return false;
+  }
+
+  /**
+   * Adds a server instance to ZooKeeper as a znode.
+   *
+   * @param hiveConf
+   * @throws Exception
+   */
+  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+    int zooKeeperSessionTimeout =
+        hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+    String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+    String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    String instanceURI = getServerInstanceURI(hiveConf);
+    byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+    zooKeeperClient =
+        new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+            new ZooKeeperHiveHelper.DummyWatcher());
+
+    // Create the parent znodes recursively; ignore if the parent already exists
+    try {
+      ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace,
+          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
+    } catch (KeeperException e) {
+      if (e.code() != KeeperException.Code.NODEEXISTS) {
+        LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
+        throw (e);
+      }
     }
+    // Create a znode under the rootNamespace parent for this instance of the server
+    // Znode name: server-host:port-versionInfo-sequence
+    try {
+      String znodePath =
+          ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+              + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-"
+              + HiveVersionInfo.getVersion() + "-";
+      znodePath =
+          zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE,
+              CreateMode.EPHEMERAL_SEQUENTIAL);
+      setRegisteredWithZooKeeper(true);
+      // Set a watch on the znode
+      if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+        // No node exists, throw exception
+        throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
+      }
+      LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+    } catch (KeeperException e) {
+      LOG.fatal("Unable to create a znode for this server instance", e);
+      throw new Exception(e);
+    }
+  }
 
-    addService(thriftCLIService);
-    super.init(hiveConf);
+  /**
+   * The watcher class which sets the de-register flag when the znode corresponding to this server
+   * instance is deleted. Additionally, it shuts down the server if there are no more active client
+   * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
+   */
+  private class DeRegisterWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+        HiveServer2.this.setRegisteredWithZooKeeper(false);
+        // If there are no more active client sessions, stop the server
+        if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+          LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+              + "instances available for dynamic service discovery. "
+              + "The last client session has ended - will shutdown now.");
+          HiveServer2.this.stop();
+        }
+        LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+            + "The server will be shut down after the last client sesssion completes.");
+      }
+    }
+  }
+
+  private void removeServerInstanceFromZooKeeper() throws Exception {
+    setRegisteredWithZooKeeper(false);
+    zooKeeperClient.close();
+    LOG.info("Server instance removed from ZooKeeper.");
+  }
+
+  public boolean isRegisteredWithZooKeeper() {
+    return registeredWithZooKeeper;
+  }
+
+  private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) {
+    this.registeredWithZooKeeper = registeredWithZooKeeper;
+  }
+
+  private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+    if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
+      throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+    }
+    return thriftCLIService.getServerAddress().getHostName() + ":"
+        + thriftCLIService.getPortNumber();
   }
 
   @Override
@@ -76,16 +199,25 @@ public class HiveServer2 extends Composi
 
   @Override
   public synchronized void stop() {
-    super.stop();
-    // there should already be an instance of the session pool manager.
-    // if not, ignoring is fine while stopping the hive server.
+    LOG.info("Shutting down HiveServer2");
     HiveConf hiveConf = this.getHiveConf();
+    super.stop();
+    // Remove this server instance from ZooKeeper if dynamic service discovery is set
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+      try {
+        removeServerInstanceFromZooKeeper();
+      } catch (Exception e) {
+        LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+      }
+    }
+    // There should already be an instance of the session pool manager.
+    // If not, ignoring is fine while stopping HiveServer2.
     if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
       try {
         TezSessionPoolManager.getInstance().stop();
       } catch (Exception e) {
-        LOG.error("Tez session pool manager stop had an error during stop of hive server");
-        e.printStackTrace();
+        LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+            + "Shutting down HiveServer2 anyway.", e);
       }
     }
 
@@ -100,7 +232,7 @@ public class HiveServer2 extends Composi
 
   private static void startHiveServer2() throws Throwable {
     long attempts = 0, maxAttempts = 1;
-    while(true) {
+    while (true) {
       HiveConf hiveConf = new HiveConf();
       maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
       HiveServer2 server = null;
@@ -108,6 +240,11 @@ public class HiveServer2 extends Composi
         server = new HiveServer2();
         server.init(hiveConf);
         server.start();
+        // If we're supporting dynamic service discovery, we'll add the service uri for this
+        // HiveServer2 instance to Zookeeper as a znode.
+        if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+          server.addServerInstanceToZooKeeper(hiveConf);
+        }
         if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
           TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
           sessionPool.setupPool(hiveConf);
@@ -119,19 +256,19 @@ public class HiveServer2 extends Composi
         }
         break;
       } catch (Throwable throwable) {
-        if(++attempts >= maxAttempts) {
+        if (++attempts >= maxAttempts) {
           throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
         } else {
-          LOG.warn("Error starting HiveServer2 on attempt " + attempts +
-            ", will retry in 60 seconds", throwable);
+          LOG.warn("Error starting HiveServer2 on attempt " + attempts
+              + ", will retry in 60 seconds", throwable);
           try {
             if (server != null) {
               server.stop();
               server = null;
             }
           } catch (Exception e) {
-            LOG.info("Exception caught when calling stop of HiveServer2 before" +
-              " retrying start", e);
+            LOG.info(
+                "Exception caught when calling stop of HiveServer2 before" + " retrying start", e);
           }
           try {
             Thread.sleep(60L * 1000L);
@@ -152,14 +289,15 @@ public class HiveServer2 extends Composi
         System.exit(-1);
       }
 
-      //NOTE: It is critical to do this here so that log4j is reinitialized
+      // NOTE: It is critical to do this here so that log4j is reinitialized
       // before any of the other core hive classes are loaded
       String initLog4jMessage = LogUtils.initHiveLog4j();
       LOG.debug(initLog4jMessage);
 
       HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
-      //log debug message from "oproc" after log4j initialize properly
+      // log debug message from "oproc" after log4j initialize properly
       LOG.debug(oproc.getDebugMessage().toString());
+
       startHiveServer2();
     } catch (LogInitializationException e) {
       LOG.error("Error initializing log: " + e.getMessage(), e);
@@ -169,6 +307,5 @@ public class HiveServer2 extends Composi
       System.exit(-1);
     }
   }
-
 }
 

Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Sat Sep 20 17:34:39 2014
@@ -27,7 +27,11 @@ import junit.framework.TestCase;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
 import org.junit.After;
@@ -83,7 +87,7 @@ public class TestSessionGlobalInitFile e
 
     // set up service and client
     HiveConf hiveConf = new HiveConf();
-    hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION,
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION,
         initFile.getParentFile().getAbsolutePath());
     service = new FakeEmbeddedThriftBinaryCLIService(hiveConf);
     service.init(new HiveConf());

Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Sat Sep 20 17:34:39 2014
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
@@ -652,6 +653,17 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.sync();
   }

Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Sat Sep 20 17:34:39 2014
@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -403,6 +404,17 @@ public class Hadoop20SShims extends Hado
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.sync();
   }

Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Sat Sep 20 17:34:39 2014
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -511,6 +512,17 @@ public class Hadoop23Shims extends Hadoo
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.hflush();
   }

Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Sat Sep 20 17:34:39 2014
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import javax.security.auth.login.LoginException;
 
@@ -477,6 +478,19 @@ public interface HadoopShims {
       FileStatus status) throws IOException;
 
   /**
+   * For the block locations returned by getLocations() convert them into a Treemap
+   * <Offset,blockLocation> by iterating over the list of blockLocation.
+   * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular
+   * block based upon offset.
+   * @param fs the file system
+   * @param status the file information
+   * @return TreeMap<Long, BlockLocation>
+   * @throws IOException
+   */
+  TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+      FileStatus status) throws IOException;
+
+  /**
    * Flush and make visible to other users the changes to the given stream.
    * @param stream the stream to hflush.
    * @throws IOException