You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/20 19:34:43 UTC
svn commit: r1626482 [6/6] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/
common/src/java/org/apache/hadoop/hive/conf/ data/files/ hcatalog/hc...
Modified: hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java (original)
+++ hive/branches/spark/service/src/java/org/apache/hive/service/server/HiveServer2.java Sat Sep 20 17:34:39 2014
@@ -18,6 +18,8 @@
package org.apache.hive.service.server;
+import java.nio.charset.Charset;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.LogUtils;
@@ -26,12 +28,21 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.CompositeService;
+import org.apache.hive.service.ServiceException;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
/**
* HiveServer2.
@@ -42,9 +53,12 @@ public class HiveServer2 extends Composi
private CLIService cliService;
private ThriftCLIService thriftCLIService;
+ private String znodePath;
+ private ZooKeeper zooKeeperClient;
+ private boolean registeredWithZooKeeper = false;
public HiveServer2() {
- super("HiveServer2");
+ super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@@ -53,20 +67,129 @@ public class HiveServer2 extends Composi
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService();
addService(cliService);
+ if (isHTTPTransportMode(hiveConf)) {
+ thriftCLIService = new ThriftHttpCLIService(cliService);
+ } else {
+ thriftCLIService = new ThriftBinaryCLIService(cliService);
+ }
+ addService(thriftCLIService);
+ thriftCLIService.setHiveServer2(this);
+ super.init(hiveConf);
+ // Add a shutdown hook for catching SIGTERM & SIGINT
+ final HiveServer2 hiveServer2 = this;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ hiveServer2.stop();
+ }
+ });
+ }
+
+ public static boolean isHTTPTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
- if(transportMode == null) {
+ if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
- if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
- thriftCLIService = new ThriftHttpCLIService(cliService);
+ if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+ return true;
}
- else {
- thriftCLIService = new ThriftBinaryCLIService(cliService);
+ return false;
+ }
+
+ /**
+ * Adds a server instance to ZooKeeper as a znode.
+ *
+ * @param hiveConf
+ * @throws Exception
+ */
+ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+ int zooKeeperSessionTimeout =
+ hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ String instanceURI = getServerInstanceURI(hiveConf);
+ byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
+ zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+ new ZooKeeperHiveHelper.DummyWatcher());
+
+ // Create the parent znodes recursively; ignore if the parent already exists
+ try {
+ ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
+ } catch (KeeperException e) {
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
+ throw (e);
+ }
}
+ // Create a znode under the rootNamespace parent for this instance of the server
+ // Znode name: server-host:port-versionInfo-sequence
+ try {
+ String znodePath =
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-"
+ + HiveVersionInfo.getVersion() + "-";
+ znodePath =
+ zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ setRegisteredWithZooKeeper(true);
+ // Set a watch on the znode
+ if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+ // No node exists, throw exception
+ throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
+ }
+ LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+ } catch (KeeperException e) {
+ LOG.fatal("Unable to create a znode for this server instance", e);
+ throw new Exception(e);
+ }
+ }
- addService(thriftCLIService);
- super.init(hiveConf);
+ /**
+ * The watcher class which sets the de-register flag when the znode corresponding to this server
+ * instance is deleted. Additionally, it shuts down the server if there are no more active client
+ * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
+ */
+ private class DeRegisterWatcher implements Watcher {
+ public void process(WatchedEvent event) {
+ if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
+ HiveServer2.this.setRegisteredWithZooKeeper(false);
+ // If there are no more active client sessions, stop the server
+ if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+ LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ HiveServer2.this.stop();
+ }
+ LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ + "The server will be shut down after the last client sesssion completes.");
+ }
+ }
+ }
+
+ private void removeServerInstanceFromZooKeeper() throws Exception {
+ setRegisteredWithZooKeeper(false);
+ zooKeeperClient.close();
+ LOG.info("Server instance removed from ZooKeeper.");
+ }
+
+ public boolean isRegisteredWithZooKeeper() {
+ return registeredWithZooKeeper;
+ }
+
+ private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) {
+ this.registeredWithZooKeeper = registeredWithZooKeeper;
+ }
+
+ private String getServerInstanceURI(HiveConf hiveConf) throws Exception {
+ if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) {
+ throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
+ }
+ return thriftCLIService.getServerAddress().getHostName() + ":"
+ + thriftCLIService.getPortNumber();
}
@Override
@@ -76,16 +199,25 @@ public class HiveServer2 extends Composi
@Override
public synchronized void stop() {
- super.stop();
- // there should already be an instance of the session pool manager.
- // if not, ignoring is fine while stopping the hive server.
+ LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
+ super.stop();
+ // Remove this server instance from ZooKeeper if dynamic service discovery is set
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ try {
+ removeServerInstanceFromZooKeeper();
+ } catch (Exception e) {
+ LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+ }
+ }
+ // There should already be an instance of the session pool manager.
+ // If not, ignoring is fine while stopping HiveServer2.
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
try {
TezSessionPoolManager.getInstance().stop();
} catch (Exception e) {
- LOG.error("Tez session pool manager stop had an error during stop of hive server");
- e.printStackTrace();
+ LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+ + "Shutting down HiveServer2 anyway.", e);
}
}
@@ -100,7 +232,7 @@ public class HiveServer2 extends Composi
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
- while(true) {
+ while (true) {
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
@@ -108,6 +240,11 @@ public class HiveServer2 extends Composi
server = new HiveServer2();
server.init(hiveConf);
server.start();
+ // If we're supporting dynamic service discovery, we'll add the service uri for this
+ // HiveServer2 instance to Zookeeper as a znode.
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ server.addServerInstanceToZooKeeper(hiveConf);
+ }
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
sessionPool.setupPool(hiveConf);
@@ -119,19 +256,19 @@ public class HiveServer2 extends Composi
}
break;
} catch (Throwable throwable) {
- if(++attempts >= maxAttempts) {
+ if (++attempts >= maxAttempts) {
throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
} else {
- LOG.warn("Error starting HiveServer2 on attempt " + attempts +
- ", will retry in 60 seconds", throwable);
+ LOG.warn("Error starting HiveServer2 on attempt " + attempts
+ + ", will retry in 60 seconds", throwable);
try {
if (server != null) {
server.stop();
server = null;
}
} catch (Exception e) {
- LOG.info("Exception caught when calling stop of HiveServer2 before" +
- " retrying start", e);
+ LOG.info(
+ "Exception caught when calling stop of HiveServer2 before" + " retrying start", e);
}
try {
Thread.sleep(60L * 1000L);
@@ -152,14 +289,15 @@ public class HiveServer2 extends Composi
System.exit(-1);
}
- //NOTE: It is critical to do this here so that log4j is reinitialized
+ // NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
String initLog4jMessage = LogUtils.initHiveLog4j();
LOG.debug(initLog4jMessage);
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
- //log debug message from "oproc" after log4j initialize properly
+ // log debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
+
startHiveServer2();
} catch (LogInitializationException e) {
LOG.error("Error initializing log: " + e.getMessage(), e);
@@ -169,6 +307,5 @@ public class HiveServer2 extends Composi
System.exit(-1);
}
}
-
}
Modified: hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java (original)
+++ hive/branches/spark/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java Sat Sep 20 17:34:39 2014
@@ -27,7 +27,11 @@ import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
import org.junit.After;
@@ -83,7 +87,7 @@ public class TestSessionGlobalInitFile e
// set up service and client
HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION,
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION,
initFile.getParentFile().getAbsolutePath());
service = new FakeEmbeddedThriftBinaryCLIService(hiveConf);
service.init(new HiveConf());
Modified: hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/branches/spark/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Sat Sep 20 17:34:39 2014
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
@@ -652,6 +653,17 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.sync();
}
Modified: hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/branches/spark/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Sat Sep 20 17:34:39 2014
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -403,6 +404,17 @@ public class Hadoop20SShims extends Hado
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.sync();
}
Modified: hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/branches/spark/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Sat Sep 20 17:34:39 2014
@@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -511,6 +512,17 @@ public class Hadoop23Shims extends Hadoo
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.hflush();
}
Modified: hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1626482&r1=1626481&r2=1626482&view=diff
==============================================================================
--- hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/branches/spark/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Sat Sep 20 17:34:39 2014
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import javax.security.auth.login.LoginException;
@@ -477,6 +478,19 @@ public interface HadoopShims {
FileStatus status) throws IOException;
/**
+ * For the block locations returned by getLocations() convert them into a Treemap
+ * <Offset,blockLocation> by iterating over the list of blockLocation.
+ * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular
+ * block based upon offset.
+ * @param fs the file system
+ * @param status the file information
+ * @return TreeMap<Long, BlockLocation>
+ * @throws IOException
+ */
+ TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException;
+
+ /**
* Flush and make visible to other users the changes to the given stream.
* @param stream the stream to hflush.
* @throws IOException