You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2015/05/05 19:38:17 UTC
hive git commit: HIVE-8890: HiveServer2 dynamic service discovery:
use persistent ephemeral nodes curator recipe (Vaibhav Gumashta reviewed by
Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master cccaa5509 -> 652febcda
HIVE-8890: HiveServer2 dynamic service discovery: use persistent ephemeral nodes curator recipe (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/652febcd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/652febcd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/652febcd
Branch: refs/heads/master
Commit: 652febcdab727f39c05d6b5b3c0a6526d254ee0e
Parents: cccaa55
Author: Vaibhav Gumashta <vg...@apache.org>
Authored: Tue May 5 10:37:51 2015 -0700
Committer: Vaibhav Gumashta <vg...@apache.org>
Committed: Tue May 5 10:37:51 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
pom.xml | 5 +
service/pom.xml | 5 +
.../cli/thrift/ThriftBinaryCLIService.java | 1 -
.../apache/hive/service/server/HiveServer2.java | 106 +++++++++++++++----
5 files changed, 97 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f04ce82..5d4dbea 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1410,7 +1410,7 @@ public class HiveConf extends Configuration {
"The port of ZooKeeper servers to talk to.\n" +
"If the list of Zookeeper servers specified in hive.zookeeper.quorum\n" +
"does not contain port numbers, this value is used."),
- HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "600000ms",
+ HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "1200000ms",
new TimeValidator(TimeUnit.MILLISECONDS),
"ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index acacf81..1921b06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -512,6 +512,11 @@
<version>${curator.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index c5815af..d8e3126 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -91,6 +91,11 @@
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
<!-- intra-project -->
<dependency>
<groupId>org.apache.hive</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index ca1eae6..6c9efba 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -93,7 +93,6 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(serverEventHandler);
- server.serve();
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index dc2217f..58e8e49 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -23,6 +23,8 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -35,6 +37,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -67,9 +73,11 @@ import org.apache.zookeeper.data.ACL;
*/
public class HiveServer2 extends CompositeService {
private static final Log LOG = LogFactory.getLog(HiveServer2.class);
+ private static CountDownLatch deleteSignal;
private CLIService cliService;
private ThriftCLIService thriftCLIService;
+ private PersistentEphemeralNode znode;
private String znodePath;
private CuratorFramework zooKeeperClient;
private boolean registeredWithZooKeeper = false;
@@ -151,12 +159,19 @@ public class HiveServer2 extends CompositeService {
String instanceURI = getServerInstanceURI(hiveConf);
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
setUpZooKeeperAuth(hiveConf);
+ int sessionTimeout =
+ (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ int baseSleepTime =
+ (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+ TimeUnit.MILLISECONDS);
+ int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
- .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
+ .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
zooKeeperClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
try {
@@ -176,18 +191,28 @@ public class HiveServer2 extends CompositeService {
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
- znodePath =
- zooKeeperClient.create().creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8);
+ znode =
+ new PersistentEphemeralNode(zooKeeperClient,
+ PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
+ znode.start();
+ // We'll wait for 120s for node creation
+ long znodeCreationTimeout = 120;
+ if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
+ throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
+ }
setRegisteredWithZooKeeper(true);
+ znodePath = znode.getActualPath();
// Set a watch on the znode
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
- } catch (KeeperException e) {
+ } catch (Exception e) {
LOG.fatal("Unable to create a znode for this server instance", e);
+ if (znode != null) {
+ znode.close();
+ }
throw (e);
}
}
@@ -223,22 +248,33 @@ public class HiveServer2 extends CompositeService {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
- HiveServer2.this.setRegisteredWithZooKeeper(false);
- // If there are no more active client sessions, stop the server
- if (cliService.getSessionManager().getOpenSessionCount() == 0) {
- LOG.warn("This instance of HiveServer2 has been removed from the list of server "
- + "instances available for dynamic service discovery. "
- + "The last client session has ended - will shutdown now.");
- HiveServer2.this.stop();
+ if (znode != null) {
+ try {
+ znode.close();
+ LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ + "The server will be shut down after the last client sesssion completes.");
+ } catch (IOException e) {
+ LOG.error("Failed to close the persistent ephemeral znode", e);
+ } finally {
+ HiveServer2.this.setRegisteredWithZooKeeper(false);
+ // If there are no more active client sessions, stop the server
+ if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+ LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ + "instances available for dynamic service discovery. "
+ + "The last client session has ended - will shutdown now.");
+ HiveServer2.this.stop();
+ }
+ }
}
- LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
- + "The server will be shut down after the last client sesssion completes.");
}
}
}
private void removeServerInstanceFromZooKeeper() throws Exception {
setRegisteredWithZooKeeper(false);
+ if (znode != null) {
+ znode.close();
+ }
zooKeeperClient.close();
LOG.info("Server instance removed from ZooKeeper.");
}
@@ -359,25 +395,53 @@ public class HiveServer2 extends CompositeService {
HiveConf hiveConf = new HiveConf();
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+ int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
zooKeeperClient.start();
List<String> znodePaths =
zooKeeperClient.getChildren().forPath(
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+ List<String> znodePathsUpdated;
// Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
- for (String znodePath : znodePaths) {
+ for (int i = 0; i < znodePaths.size(); i++) {
+ String znodePath = znodePaths.get(i);
+ deleteSignal = new CountDownLatch(1);
if (znodePath.contains("version=" + versionNumber + ";")) {
- LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
- zooKeeperClient.delete().forPath(
+ String fullZnodePath =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
- + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath;
+ LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
+ System.out.println("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper");
+ zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack())
+ .forPath(fullZnodePath);
+ // Wait for the delete to complete
+ deleteSignal.await();
+ // Get the updated path list
+ znodePathsUpdated =
+ zooKeeperClient.getChildren().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+ // Gives a list of any new paths that may have been created to maintain the persistent ephemeral node
+ znodePathsUpdated.removeAll(znodePaths);
+ // Add the new paths to the znodes list. We'll try for their removal as well.
+ znodePaths.addAll(znodePathsUpdated);
}
}
zooKeeperClient.close();
}
+ private static class DeleteCallBack implements BackgroundCallback {
+ @Override
+ public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event)
+ throws Exception {
+ if (event.getType() == CuratorEventType.DELETE) {
+ deleteSignal.countDown();
+ }
+ }
+ }
+
public static void main(String[] args) {
HiveConf.setLoadHiveServer2Config(true);
try {
@@ -547,6 +611,8 @@ public class HiveServer2 extends CompositeService {
} catch (Exception e) {
LOG.fatal("Error deregistering HiveServer2 instances for version: " + versionNumber
+ " from ZooKeeper", e);
+ System.out.println("Error deregistering HiveServer2 instances for version: " + versionNumber
+ + " from ZooKeeper." + e);
System.exit(-1);
}
System.exit(0);