You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/03/16 06:01:40 UTC

[1/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 300433537 -> 21c6a5407


http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
new file mode 100644
index 0000000..6514d98
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import static org.apache.hive.service.server.HiveServer2.INSTANCE_URI_CONFIG;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hive.service.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instance> implements
+  ServiceRegistry<HiveServer2Instance>, HiveServer2HAInstanceSet {
+  private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistry.class);
+  static final String ACTIVE_ENDPOINT = "activeEndpoint";
+  static final String PASSIVE_ENDPOINT = "passiveEndpoint";
+  private static final String SASL_LOGIN_CONTEXT_NAME = "HS2ActivePassiveHAZooKeeperClient";
+  private static final String INSTANCE_PREFIX = "instance-";
+  private static final String INSTANCE_GROUP = "instances";
+  private static final String LEADER_LATCH_PATH = "/_LEADER";
+  private LeaderLatch leaderLatch;
+  private ServiceRecord srv;
+  private boolean isClient;
+
+  // There are 2 paths under which the instances get registered
+  // 1) Standard path used by ZkRegistryBase where all instances register themselves (also stores metadata)
+  // Secure: /hs2ActivePassiveHA-sasl/instances/instance-0000000000
+  // Unsecure: /hs2ActivePassiveHA-unsecure/instances/instance-0000000000
+  // 2) Leader latch path used for HS2 HA Active/Passive configuration where all instances register under _LEADER
+  //    path but only one among them is the leader
+  // Secure: /hs2ActivePassiveHA-sasl/_LEADER/xxxx-latch-0000000000
+  // Unsecure: /hs2ActivePassiveHA-unsecure/_LEADER/xxxx-latch-0000000000
+  static HS2ActivePassiveHARegistry create(Configuration conf, boolean isClient) {
+    String zkNameSpace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE);
+    Preconditions.checkArgument(!StringUtils.isBlank(zkNameSpace),
+      HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty");
+    String principal = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+    String keytab = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+    String zkNameSpacePrefix = zkNameSpace + "-";
+    return new HS2ActivePassiveHARegistry(null, zkNameSpacePrefix, LEADER_LATCH_PATH, principal, keytab,
+      SASL_LOGIN_CONTEXT_NAME, conf, isClient);
+  }
+
+  private HS2ActivePassiveHARegistry(final String instanceName, final String zkNamespacePrefix,
+    final String leaderLatchPath,
+    final String krbPrincipal, final String krbKeytab, final String saslContextName, final Configuration conf,
+    final boolean isClient) {
+    super(instanceName, conf, null, zkNamespacePrefix, null, INSTANCE_PREFIX, INSTANCE_GROUP,
+      saslContextName, krbPrincipal, krbKeytab, null);
+    this.isClient = isClient;
+    leaderLatch = new LeaderLatch(zooKeeperClient, leaderLatchPath, UNIQUE_ID.toString(),
+      LeaderLatch.CloseMode.NOTIFY_LEADER);
+  }
+
+  @Override
+  public void start() throws IOException {
+    super.start();
+    if (!isClient) {
+      this.srv = getNewServiceRecord();
+      register();
+      leaderLatch.addListener(new HS2LeaderLatchListener());
+      try {
+        // all participating instances uses the same latch path, and curator randomly chooses one instance to be leader
+        // which can be verified via leaderLatch.hasLeadership()
+        leaderLatch.start();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+      LOG.info("Registered HS2 with ZK. service record: {}", srv);
+    } else {
+      populateCache();
+    }
+  }
+
+  @Override
+  protected void unregisterInternal() {
+    super.unregisterInternal();
+  }
+
+  @Override
+  public String register() throws IOException {
+    updateEndpoint(srv, PASSIVE_ENDPOINT);
+    return registerServiceRecord(srv);
+  }
+
+  @Override
+  public void unregister() {
+    CloseableUtils.closeQuietly(leaderLatch);
+    unregisterInternal();
+  }
+
+  private void populateCache() throws IOException {
+    PathChildrenCache pcc = ensureInstancesCache(0);
+    populateCache(pcc, false);
+  }
+
+  @Override
+  public ServiceInstanceSet<HiveServer2Instance> getInstances(final String component, final long clusterReadyTimeoutMs)
+    throws IOException {
+    throw new IOException("Not supported to get instances by component name");
+  }
+
+  private void addActiveEndpointToServiceRecord() throws IOException {
+    addEndpointToServiceRecord(getNewServiceRecord(), ACTIVE_ENDPOINT);
+  }
+
+  private void addPassiveEndpointToServiceRecord() throws IOException {
+    addEndpointToServiceRecord(getNewServiceRecord(), PASSIVE_ENDPOINT);
+  }
+
+  private void addEndpointToServiceRecord(final ServiceRecord srv, final String endpointName) throws IOException {
+    updateEndpoint(srv, endpointName);
+    updateServiceRecord(srv);
+  }
+
+  private void updateEndpoint(final ServiceRecord srv, final String endpointName) {
+    final String instanceUri = srv.get(INSTANCE_URI_CONFIG);
+    final String[] tokens = instanceUri.split(":");
+    final String hostname = tokens[0];
+    final int port = Integer.parseInt(tokens[1]);
+    Endpoint urlEndpoint = RegistryTypeUtils.ipcEndpoint(endpointName, new InetSocketAddress(hostname, port));
+    srv.addInternalEndpoint(urlEndpoint);
+    LOG.info("Added {} endpoint to service record", urlEndpoint);
+  }
+
+  @Override
+  public void stop() {
+    CloseableUtils.closeQuietly(leaderLatch);
+    super.stop();
+  }
+
+  @Override
+  protected HiveServer2Instance createServiceInstance(final ServiceRecord srv) throws IOException {
+    Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT);
+    return new HiveServer2Instance(srv, activeEndpoint != null ? ACTIVE_ENDPOINT : PASSIVE_ENDPOINT);
+  }
+
+  @Override
+  public synchronized void registerStateChangeListener(
+    final ServiceInstanceStateChangeListener<HiveServer2Instance> listener)
+    throws IOException {
+    super.registerStateChangeListener(listener);
+  }
+
+  @Override
+  public ApplicationId getApplicationId() throws IOException {
+    throw new IOException("Not supported until HS2 runs as YARN application");
+  }
+
+  @Override
+  protected String getZkPathUser(final Configuration conf) {
+    return RegistryUtils.currentUser();
+  }
+
+  private boolean hasLeadership() {
+    return leaderLatch.hasLeadership();
+  }
+
+  private class HS2LeaderLatchListener implements LeaderLatchListener {
+
+    // leadership state changes and sending out notifications to listener happens inside synchronous method in curator.
+    // Do only lightweight actions in main-event handler thread. Time consuming operations are handled via separate
+    // executor service registered via registerLeaderLatchListener().
+    @Override
+    public void isLeader() {
+      // only leader publishes instance uri as endpoint which will be used by clients to make connections to HS2 via
+      // service discovery.
+      try {
+        if (!hasLeadership()) {
+          LOG.info("isLeader notification received but hasLeadership returned false.. awaiting..");
+          leaderLatch.await();
+        }
+        addActiveEndpointToServiceRecord();
+        LOG.info("HS2 instance in ACTIVE mode. Service record: {}", srv);
+      } catch (Exception e) {
+        throw new ServiceException("Unable to add active endpoint to service record", e);
+      }
+    }
+
+    @Override
+    public void notLeader() {
+      try {
+        if (hasLeadership()) {
+          LOG.info("notLeader notification received but hasLeadership returned true.. awaiting..");
+          leaderLatch.await();
+        }
+        addPassiveEndpointToServiceRecord();
+        LOG.info("HS2 instance lost leadership. Switched to PASSIVE standby mode. Service record: {}", srv);
+      } catch (Exception e) {
+        throw new ServiceException("Unable to add passive endpoint to service record", e);
+      }
+    }
+  }
+
+  @Override
+  public HiveServer2Instance getLeader() {
+    for (HiveServer2Instance hs2Instance : getAll()) {
+      if (hs2Instance.isLeader()) {
+        return hs2Instance;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Collection<HiveServer2Instance> getAll() {
+    return getAllInternal();
+  }
+
+  @Override
+  public HiveServer2Instance getInstance(final String instanceId) {
+    for (HiveServer2Instance hs2Instance : getAll()) {
+      if (hs2Instance.getWorkerIdentity().equals(instanceId)) {
+        return hs2Instance;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Set<HiveServer2Instance> getByHost(final String host) {
+    return getByHostInternal(host);
+  }
+
+  @Override
+  public int size() {
+    return sizeInternal();
+  }
+
+  /**
+   * If leadership related notifications is desired, use this method to register leader latch listener.
+   *
+   * @param latchListener   - listener
+   * @param executorService - event handler executor service
+   */
+  void registerLeaderLatchListener(final LeaderLatchListener latchListener, final ExecutorService executorService) {
+    leaderLatch.addListener(latchListener, executorService);
+  }
+
+  private Map<String, String> getConfsToPublish() {
+    final Map<String, String> confsToPublish = new HashMap<>();
+    // Hostname
+    confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
+      conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname));
+    // Hostname:port
+    confsToPublish.put(INSTANCE_URI_CONFIG, conf.get(INSTANCE_URI_CONFIG));
+    confsToPublish.put(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
+    // Transport mode
+    confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
+      conf.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname));
+    // Transport specific confs
+    if (HiveServer2.isHTTPTransportMode(conf)) {
+      confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname,
+        conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname));
+      confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname,
+        conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname));
+    } else {
+      confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
+        conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+      confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname,
+        conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_SASL_QOP.varname));
+    }
+    // Auth specific confs
+    confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname,
+      conf.get(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname));
+    if (HiveServer2.isKerberosAuthMode(conf)) {
+      confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname,
+        conf.get(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname));
+    }
+    // SSL conf
+    confsToPublish.put(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname,
+      conf.get(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL.varname));
+    return confsToPublish;
+  }
+
+  private ServiceRecord getNewServiceRecord() {
+    ServiceRecord srv = new ServiceRecord();
+    final Map<String, String> confsToPublish = getConfsToPublish();
+    for (Map.Entry<String, String> entry : confsToPublish.entrySet()) {
+      srv.set(entry.getKey(), entry.getValue());
+    }
+    return srv;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
new file mode 100644
index 0000000..512d4e8
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistryClient.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.service.server;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class HS2ActivePassiveHARegistryClient {
+  private static final Logger LOG = LoggerFactory.getLogger(HS2ActivePassiveHARegistryClient.class);
+  private static final Map<String, HS2ActivePassiveHARegistry> hs2Registries = new HashMap<>();
+
+  /**
+   * Helper method to get a HiveServer2HARegistry instance to read from the registry. Only used by clients (JDBC),
+   * service discovery to connect to active HS2 instance in Active/Passive HA configuration.
+   *
+   * @param conf {@link Configuration} instance which contains service registry information.
+   * @return HiveServer2HARegistry
+   */
+  public static synchronized HS2ActivePassiveHARegistry getClient(Configuration conf) throws IOException {
+    String namespace = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE);
+    Preconditions.checkArgument(!StringUtils.isBlank(namespace),
+      HiveConf.ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE.varname + " cannot be null or empty");
+    String nsKey = ZkRegistryBase.getRootNamespace(null, namespace + "-");
+    HS2ActivePassiveHARegistry registry = hs2Registries.get(nsKey);
+    if (registry == null) {
+      registry = HS2ActivePassiveHARegistry.create(conf, true);
+      registry.start();
+      hs2Registries.put(nsKey, registry);
+    } else {
+      LOG.debug("Returning cached registry client for nsKey: {}", nsKey);
+    }
+    return registry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index b7ece2b..5b792ac 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -27,10 +27,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -38,6 +39,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -45,8 +47,10 @@ import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -69,6 +73,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
 import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
@@ -86,6 +91,8 @@ import org.apache.hive.service.cli.CLIService;
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
 import org.apache.hive.service.cli.thrift.ThriftCLIService;
 import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.hive.service.servlet.HS2LeadershipStatus;
+import org.apache.hive.service.servlet.HS2Peers;
 import org.apache.hive.service.servlet.QueryProfileServlet;
 import org.apache.logging.log4j.util.Strings;
 import org.apache.zookeeper.CreateMode;
@@ -101,6 +108,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * HiveServer2.
@@ -109,16 +117,27 @@ import com.google.common.collect.Lists;
 public class HiveServer2 extends CompositeService {
   private static CountDownLatch deleteSignal;
   private static final Logger LOG = LoggerFactory.getLogger(HiveServer2.class);
+  public static final String INSTANCE_URI_CONFIG = "hive.server2.instance.uri";
+  private static final int SHUTDOWN_TIME = 60;
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
   private PersistentEphemeralNode znode;
-  private String znodePath;
   private CuratorFramework zooKeeperClient;
   private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens
   private HttpServer webServer; // Web UI
   private TezSessionPoolManager tezSessionPoolManager;
   private WorkloadManager wm;
   private PamAuthenticator pamAuthenticator;
+  private Map<String, String> confsToPublish = new HashMap<String, String>();
+  private String serviceUri;
+  private boolean serviceDiscovery;
+  private boolean activePassiveHA;
+  private LeaderLatchListener leaderLatchListener;
+  private ExecutorService leaderActionsExecutorService;
+  private HS2ActivePassiveHARegistry hs2HARegistry;
+  private Hive sessionHive;
+  private String wmQueue;
+  private AtomicBoolean isLeader = new AtomicBoolean(false);
 
   public HiveServer2() {
     super(HiveServer2.class.getSimpleName());
@@ -140,13 +159,6 @@ public class HiveServer2 extends CompositeService {
       if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) {
         MetricsFactory.init(hiveConf);
       }
-
-      // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
-      tezSessionPoolManager = TezSessionPoolManager.getInstance();
-      tezSessionPoolManager.initTriggers(hiveConf);
-      if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
-        tezSessionPoolManager.setupPool(hiveConf);
-      }
     } catch (Throwable t) {
       LOG.warn("Could not initiate the HiveServer2 Metrics system.  Metrics may not be reported.", t);
     }
@@ -191,15 +203,12 @@ public class HiveServer2 extends CompositeService {
       LlapRegistryService.getClient(hiveConf);
     }
 
-    Hive sessionHive = null;
     try {
       sessionHive = Hive.get(hiveConf);
     } catch (HiveException e) {
       throw new RuntimeException("Failed to get metastore connection", e);
     }
 
-    initializeWorkloadManagement(hiveConf, sessionHive);
-
     // Create views registry
     HiveMaterializedViewsRegistry.get().init();
 
@@ -212,10 +221,17 @@ public class HiveServer2 extends CompositeService {
       }
     }
 
+    wmQueue = hiveConf.get(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE.varname, "").trim();
+
+    this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY);
+    this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE);
+
     // Setup web UI
+    final int webUIPort;
+    final String webHost;
     try {
-      int webUIPort =
-          hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
+      webUIPort = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT);
+      webHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST);
       // We disable web UI in tests unless the test is explicitly setting a
       // unique web ui port so that we don't mess up ptests.
       boolean uiDisabledInTest = hiveConf.getBoolVar(ConfVars.HIVE_IN_TEST) &&
@@ -229,7 +245,7 @@ public class HiveServer2 extends CompositeService {
           LOG.info("Starting Web UI on port "+ webUIPort);
           HttpServer.Builder builder = new HttpServer.Builder("hiveserver2");
           builder.setPort(webUIPort).setConf(hiveConf);
-          builder.setHost(hiveConf.getVar(ConfVars.HIVE_SERVER2_WEBUI_BIND_HOST));
+          builder.setHost(webHost);
           builder.setMaxThreads(
             hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_THREADS));
           builder.setAdmins(hiveConf.getVar(ConfVars.USERS_IN_ADMIN_ROLE));
@@ -281,6 +297,12 @@ public class HiveServer2 extends CompositeService {
               throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_WEBUI_USE_SSL.varname + " has false value. It is recommended to set to true when PAM is used.");
             }
           }
+          if (serviceDiscovery && activePassiveHA) {
+            builder.setContextAttribute("hs2.isLeader", isLeader);
+            builder.setContextAttribute("hiveconf", hiveConf);
+            builder.addServlet("leader", HS2LeadershipStatus.class);
+            builder.addServlet("peers", HS2Peers.class);
+          }
           builder.addServlet("llap", LlapServlet.class);
           builder.addServlet("jdbcjar", JdbcJarDownloadServlet.class);
           builder.setContextRootRewriteTarget("/hiveserver2.jsp");
@@ -292,57 +314,25 @@ public class HiveServer2 extends CompositeService {
     } catch (IOException ie) {
       throw new ServiceException(ie);
     }
-    // Add a shutdown hook for catching SIGTERM & SIGINT
-    ShutdownHookManager.addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        hiveServer2.stop();
-      }
-    });
-  }
 
-  private void initializeWorkloadManagement(HiveConf hiveConf, Hive sessionHive) {
-    String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE);
-    boolean hasQueue = wmQueue != null && !wmQueue.isEmpty();
-    WMFullResourcePlan resourcePlan;
     try {
-      resourcePlan = sessionHive.getActiveResourcePlan();
-    } catch (Throwable e) {
-      if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) {
-        throw new RuntimeException(e);
-      } else {
-        resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
-      }
-    }
-    if (hasQueue && resourcePlan == null
-        && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
-      LOG.info("Creating a default resource plan for test");
-      resourcePlan = createTestResourcePlan();
-    }
-    if (resourcePlan == null) {
-      if (!hasQueue) {
-        LOG.info("Workload management is not enabled and there's no resource plan");
-        return; // TODO: we could activate it anyway, similar to the below; in case someone
-                //       wants to activate a resource plan for Tez triggers only w/o restart.
-      }
-      LOG.warn("Workload management is enabled but there's no resource plan");
-    }
-
-    if (hasQueue) {
-      // Initialize workload management.
-      LOG.info("Initializing workload management");
-      try {
-        wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new ServiceException("Unable to instantiate Workload Manager", e);
+      if (serviceDiscovery) {
+        serviceUri = getServerInstanceURI();
+        addConfsToPublish(hiveConf, confsToPublish, serviceUri);
+        if (activePassiveHA) {
+          hiveConf.set(INSTANCE_URI_CONFIG, serviceUri);
+          leaderLatchListener = new HS2LeaderLatchListener(this, SessionState.get());
+          leaderActionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Leader Actions Handler Thread").build());
+          hs2HARegistry = HS2ActivePassiveHARegistry.create(hiveConf, false);
+        }
       }
+    } catch (Exception e) {
+      throw new ServiceException(e);
     }
 
-    if (resourcePlan != null) {
-      tezSessionPoolManager.updateTriggers(resourcePlan);
-      LOG.info("Updated tez session pool manager with active resource plan: {}",
-          resourcePlan.getPlan().getName());
-    }
+    // Add a shutdown hook for catching SIGTERM & SIGINT
+    ShutdownHookManager.addShutdownHook(() -> hiveServer2.stop());
   }
 
   private WMFullResourcePlan createTestResourcePlan() {
@@ -356,10 +346,10 @@ public class HiveServer2 extends CompositeService {
     return resourcePlan;
   }
 
-  public static boolean isHTTPTransportMode(HiveConf hiveConf) {
+  public static boolean isHTTPTransportMode(Configuration hiveConf) {
     String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
     if (transportMode == null) {
-      transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
+      transportMode = hiveConf.get(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
     }
     if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
       return true;
@@ -367,8 +357,8 @@ public class HiveServer2 extends CompositeService {
     return false;
   }
 
-  public static boolean isKerberosAuthMode(HiveConf hiveConf) {
-    String authMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION);
+  public static boolean isKerberosAuthMode(Configuration hiveConf) {
+    String authMode = hiveConf.get(ConfVars.HIVE_SERVER2_AUTHENTICATION.varname);
     if (authMode != null && (authMode.equalsIgnoreCase("KERBEROS"))) {
       return true;
     }
@@ -408,7 +398,7 @@ public class HiveServer2 extends CompositeService {
    * @param hiveConf
    * @throws Exception
    */
-  private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+  private void addServerInstanceToZooKeeper(HiveConf hiveConf, Map<String, String> confsToPublish) throws Exception {
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
     String instanceURI = getServerInstanceURI();
@@ -449,8 +439,8 @@ public class HiveServer2 extends CompositeService {
       if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) {
         // HiveServer2 configs that this instance will publish to ZooKeeper,
         // so that the clients can read these and configure themselves properly.
-        Map<String, String> confsToPublish = new HashMap<String, String>();
-        addConfsToPublish(hiveConf, confsToPublish);
+
+        addConfsToPublish(hiveConf, confsToPublish, instanceURI);
         // Publish configs for this instance as the data on the node
         znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
       } else {
@@ -467,7 +457,7 @@ public class HiveServer2 extends CompositeService {
         throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
       }
       setDeregisteredWithZooKeeper(false);
-      znodePath = znode.getActualPath();
+      final String znodePath = znode.getActualPath();
       // Set a watch on the znode
       if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
         // No node exists, throw exception
@@ -487,10 +477,12 @@ public class HiveServer2 extends CompositeService {
    * Add conf keys, values that HiveServer2 will publish to ZooKeeper.
    * @param hiveConf
    */
-  private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish) {
+  private void addConfsToPublish(HiveConf hiveConf, Map<String, String> confsToPublish, String serviceUri) {
     // Hostname
     confsToPublish.put(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname,
         hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST));
+    // Hostname:port
+    confsToPublish.put(INSTANCE_URI_CONFIG, serviceUri);
     // Transport mode
     confsToPublish.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
         hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE));
@@ -540,6 +532,11 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  public boolean isLeader() {
+    return isLeader.get();
+  }
+
+
   /**
    * The watcher class which sets the de-register flag when the znode corresponding to this server
    * instance is deleted. Additionally, it shuts down the server if there are no more active client
@@ -609,10 +606,16 @@ public class HiveServer2 extends CompositeService {
     super.start();
     // If we're supporting dynamic service discovery, we'll add the service uri for this
     // HiveServer2 instance to Zookeeper as a znode.
-    HiveConf hiveConf = this.getHiveConf();
-    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+    HiveConf hiveConf = getHiveConf();
+    if (serviceDiscovery) {
       try {
-        addServerInstanceToZooKeeper(hiveConf);
+        if (activePassiveHA) {
+          hs2HARegistry.registerLeaderLatchListener(leaderLatchListener, leaderActionsExecutorService);
+          hs2HARegistry.start();
+          LOG.info("HS2 HA registry started");
+        } else {
+          addServerInstanceToZooKeeper(hiveConf, confsToPublish);
+        }
       } catch (Exception e) {
         LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
         throw new ServiceException(e);
@@ -627,22 +630,125 @@ public class HiveServer2 extends CompositeService {
         throw new ServiceException(e);
       }
     }
+
+    if (!activePassiveHA) {
+      LOG.info("HS2 interactive HA not enabled. Starting tez sessions..");
+      startOrReconnectTezSessions();
+    } else {
+      LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader.");
+    }
+  }
+
+  private static class HS2LeaderLatchListener implements LeaderLatchListener {
+    private HiveServer2 hiveServer2;
+    private SessionState parentSession;
+
+    HS2LeaderLatchListener(final HiveServer2 hs2, final SessionState parentSession) {
+      this.hiveServer2 = hs2;
+      this.parentSession = parentSession;
+    }
+
+    // leadership status change happens inside synchronized methods LeaderLatch.setLeadership().
+    // Also we use single threaded executor service for handling notifications which guarantees ordering for
+    // notification handling. if a leadership status change happens when tez sessions are getting created,
+    // the notLeader notification will get queued in executor service.
+    @Override
+    public void isLeader() {
+      LOG.info("HS2 instance {} became the LEADER. Starting/Reconnecting tez sessions..", hiveServer2.serviceUri);
+      hiveServer2.isLeader.set(true);
+      if (parentSession != null) {
+        SessionState.setCurrentSessionState(parentSession);
+      }
+      hiveServer2.startOrReconnectTezSessions();
+      LOG.info("Started/Reconnected tez sessions.");
+    }
+
+    @Override
+    public void notLeader() {
+      LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri);
+      hiveServer2.isLeader.set(false);
+      hiveServer2.stopOrDisconnectTezSessions();
+      LOG.info("Stopped/Disconnected tez sessions.");
+    }
+  }
+
+  private void startOrReconnectTezSessions() {
+    LOG.info("Starting/Reconnecting tez sessions..");
+    // TODO: add tez session reconnect after TEZ-3875
+    WMFullResourcePlan resourcePlan = null;
+    if (!StringUtils.isEmpty(wmQueue)) {
+      try {
+        resourcePlan = sessionHive.getActiveResourcePlan();
+      } catch (HiveException e) {
+        if (!HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST_SSL)) {
+          throw new RuntimeException(e);
+        } else {
+          resourcePlan = null; // Ignore errors in SSL tests where the connection is misconfigured.
+        }
+      }
+
+      if (resourcePlan == null && HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_IN_TEST)) {
+        LOG.info("Creating a default resource plan for test");
+        resourcePlan = createTestResourcePlan();
+      }
+    }
+    initAndStartTezSessionPoolManager(resourcePlan);
+    initAndStartWorkloadManager(resourcePlan);
+  }
+
+  private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) {
+    // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid
+    // SessionState.get() return null during createTezDir
+    try {
+      // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session.
+      LOG.info("Initializing tez session pool manager");
+      tezSessionPoolManager = TezSessionPoolManager.getInstance();
+      if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
+        tezSessionPoolManager.setupPool(hiveConf);
+      }
+      tezSessionPoolManager.startPool(hiveConf, resourcePlan);
+      LOG.info("Tez session pool manager initialized.");
+    } catch (Exception e) {
+      throw new ServiceException("Unable to setup tez session pool", e);
+    }
+  }
+
+  private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) {
+    if (!StringUtils.isEmpty(wmQueue)) {
+      // Initialize workload management.
+      LOG.info("Initializing workload management");
+      try {
+        wm = WorkloadManager.create(wmQueue, hiveConf, resourcePlan);
+        wm.start();
+        LOG.info("Workload manager initialized.");
+      } catch (Exception e) {
+        throw new ServiceException("Unable to instantiate and start Workload Manager", e);
+      }
+    } else {
+      LOG.info("Workload management is not enabled.");
+    }
+  }
+
+  private void stopOrDisconnectTezSessions() {
+    LOG.info("Stoppping/Disconnecting tez sessions.");
+    // There should already be an instance of the session pool manager.
+    // If not, ignoring is fine while stopping HiveServer2.
     if (tezSessionPoolManager != null) {
       try {
-        tezSessionPoolManager.startPool();
-        LOG.info("Started tez session pool manager..");
+        tezSessionPoolManager.stop();
+        LOG.info("Stopped tez session pool manager.");
       } catch (Exception e) {
-        LOG.error("Error starting tez session pool manager: ", e);
-        throw new ServiceException(e);
+        LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+          + "Shutting down HiveServer2 anyway.", e);
       }
     }
     if (wm != null) {
       try {
-        wm.start();
-        LOG.info("Started workload manager..");
+        wm.stop();
+        LOG.info("Stopped workload manager.");
       } catch (Exception e) {
-        LOG.error("Error starting workload manager", e);
-        throw new ServiceException(e);
+        LOG.error("Workload manager stop had an error during stop of HiveServer2. "
+          + "Shutting down HiveServer2 anyway.", e);
       }
     }
   }
@@ -652,6 +758,12 @@ public class HiveServer2 extends CompositeService {
     LOG.info("Shutting down HiveServer2");
     HiveConf hiveConf = this.getHiveConf();
     super.stop();
+    if (hs2HARegistry != null) {
+      hs2HARegistry.stop();
+      shutdownExecutor(leaderActionsExecutorService);
+      LOG.info("HS2 HA registry stopped");
+      hs2HARegistry = null;
+    }
     if (webServer != null) {
       try {
         webServer.stop();
@@ -670,32 +782,15 @@ public class HiveServer2 extends CompositeService {
       }
     }
     // Remove this server instance from ZooKeeper if dynamic service discovery is set
-    if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+    if (serviceDiscovery && !activePassiveHA) {
       try {
         removeServerInstanceFromZooKeeper();
       } catch (Exception e) {
         LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
       }
     }
-    // There should already be an instance of the session pool manager.
-    // If not, ignoring is fine while stopping HiveServer2.
-    if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) &&
-      tezSessionPoolManager != null) {
-      try {
-        tezSessionPoolManager.stop();
-      } catch (Exception e) {
-        LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
-            + "Shutting down HiveServer2 anyway.", e);
-      }
-    }
-    if (wm != null) {
-      try {
-        wm.stop();
-      } catch (Exception e) {
-        LOG.error("Workload manager stop had an error during stop of HiveServer2. "
-            + "Shutting down HiveServer2 anyway.", e);
-      }
-    }
+
+    stopOrDisconnectTezSessions();
 
     if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       try {
@@ -706,6 +801,22 @@ public class HiveServer2 extends CompositeService {
     }
   }
 
+  private void shutdownExecutor(final ExecutorService leaderActionsExecutorService) {
+    leaderActionsExecutorService.shutdown();
+    try {
+      if (!leaderActionsExecutorService.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
+        LOG.warn("Executor service did not terminate in the specified time {} sec", SHUTDOWN_TIME);
+        List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
+        LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Executor service did not terminate in the specified time {} sec. Exception: {}", SHUTDOWN_TIME,
+        e.getMessage());
+      List<Runnable> droppedTasks = leaderActionsExecutorService.shutdownNow();
+      LOG.warn("Executor service was abruptly shut down. " + droppedTasks.size() + " tasks will not be executed.");
+    }
+  }
+
   @VisibleForTesting
   public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) {
     if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
new file mode 100644
index 0000000..b31d63c
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2HAInstanceSet.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+
+public interface HiveServer2HAInstanceSet extends ServiceInstanceSet<HiveServer2Instance> {
+
+  /**
+   * In Active/Passive setup, returns current active leader.
+   *
+   * @return leader instance
+   */
+  HiveServer2Instance getLeader();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
new file mode 100644
index 0000000..558e809
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2Instance.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.registry.impl.ServiceInstanceBase;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+import com.google.common.base.Preconditions;
+
+public class HiveServer2Instance extends ServiceInstanceBase {
+  private boolean isLeader;
+  private String transportMode;
+  private String httpEndpoint;
+
+  // empty c'tor to make jackson happy
+  public HiveServer2Instance() {
+
+  }
+
+  public HiveServer2Instance(final ServiceRecord srv, final String endPointName) throws IOException {
+    super(srv, endPointName);
+
+    Endpoint activeEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.ACTIVE_ENDPOINT);
+    Endpoint passiveEndpoint = srv.getInternalEndpoint(HS2ActivePassiveHARegistry.PASSIVE_ENDPOINT);
+    this.isLeader = activeEndpoint != null;
+    Preconditions.checkArgument(activeEndpoint == null || passiveEndpoint == null,
+      "Incorrect service record. Both active and passive endpoints cannot be non-null!");
+    this.transportMode = srv.get(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname);
+    if (transportMode.equalsIgnoreCase("http")) {
+      this.httpEndpoint = srv.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname);
+    } else {
+      this.httpEndpoint = "";
+    }
+  }
+
+  public boolean isLeader() {
+    return isLeader;
+  }
+
+  public String getTransportMode() {
+    return transportMode;
+  }
+
+  public String getHttpEndpoint() {
+    return httpEndpoint;
+  }
+
+  public void setLeader(final boolean leader) {
+    isLeader = leader;
+  }
+
+  public void setTransportMode(final String transportMode) {
+    this.transportMode = transportMode;
+  }
+
+  public void setHttpEndpoint(final String httpEndpoint) {
+    this.httpEndpoint = httpEndpoint;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    HiveServer2Instance other = (HiveServer2Instance) o;
+    return super.equals(o) && isLeader == other.isLeader
+      && Objects.equals(transportMode, other.transportMode)
+      && Objects.equals(httpEndpoint, other.httpEndpoint);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode() + Objects.hashCode(isLeader) + Objects.hashCode(transportMode) + Objects.hashCode(httpEndpoint);
+  }
+
+  @Override
+  public String toString() {
+    String result = "instanceId: " + getWorkerIdentity() + " isLeader: " + isLeader + " host: " + getHost() +
+      " port: " + getRpcPort() + " transportMode: " + transportMode;
+    if (httpEndpoint != null) {
+      result += " httpEndpoint: " + httpEndpoint;
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
new file mode 100644
index 0000000..921a23e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/HS2LeadershipStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.servlet;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Returns 200 if this HS2 instance is leader.
+ */
+public class HS2LeadershipStatus extends HttpServlet {
+  private static final Logger LOG = LoggerFactory.getLogger(HS2LeadershipStatus.class);
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    ServletContext ctx = getServletContext();
+    AtomicBoolean isLeader = (AtomicBoolean) ctx.getAttribute("hs2.isLeader");
+    LOG.info("Returning isLeader: {}", isLeader);
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), isLeader);
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.flushBuffer();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/servlet/HS2Peers.java b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
new file mode 100644
index 0000000..a51bbeb
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/servlet/HS2Peers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.servlet;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Returns all HS2 instances in Active-Passive standy modes.
+ */
+public class HS2Peers extends HttpServlet {
+  public static class HS2Instances {
+    private Collection<HiveServer2Instance> hiveServer2Instances;
+
+    // empty c'tor to make jackson happy
+    public HS2Instances() {
+    }
+
+    public HS2Instances(final Collection<HiveServer2Instance> hiveServer2Instances) {
+      this.hiveServer2Instances = hiveServer2Instances;
+    }
+
+    public Collection<HiveServer2Instance> getHiveServer2Instances() {
+      return hiveServer2Instances;
+    }
+
+    public void setHiveServer2Instances(final Collection<HiveServer2Instance> hiveServer2Instances) {
+      this.hiveServer2Instances = hiveServer2Instances;
+    }
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
+    ServletContext ctx = getServletContext();
+    HiveConf hiveConf = (HiveConf) ctx.getAttribute("hiveconf");
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+    // serialize json based on field annotations only
+    mapper.setVisibilityChecker(mapper.getSerializationConfig().getDefaultVisibilityChecker()
+      .withSetterVisibility(JsonAutoDetect.Visibility.NONE));
+    HS2ActivePassiveHARegistry hs2Registry = HS2ActivePassiveHARegistryClient.getClient(hiveConf);
+    HS2Instances instances = new HS2Instances(hs2Registry.getAll());
+    mapper.writerWithDefaultPrettyPrinter().writeValue(response.getWriter(), instances);
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.flushBuffer();
+  }
+}


[2/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Posted by pr...@apache.org.
HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21c6a540
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21c6a540
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21c6a540

Branch: refs/heads/master
Commit: 21c6a5407cebc5a096cd9aa10157be05a3ea9627
Parents: 3004335
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Mar 15 23:01:25 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 15 23:01:25 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +
 itests/hive-minikdc/pom.xml                     |  20 ++
 itests/hive-unit-hadoop2/pom.xml                |  21 +-
 .../apache/hive/jdbc/TestActivePassiveHA.java   | 268 +++++++++++++++
 .../hive/jdbc/miniHS2/AbstractHiveService.java  |   8 +-
 .../org/apache/hive/jdbc/miniHS2/MiniHS2.java   |   7 +
 .../hive/llap/registry/ServiceRegistry.java     |  11 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |   4 +-
 .../llap/registry/impl/LlapRegistryService.java |   5 +-
 .../impl/LlapZookeeperRegistryImpl.java         |  12 +-
 .../hive/llap/security/LlapTokenClient.java     |   3 +-
 .../hadoop/hive/registry/RegistryUtilities.java |  52 +++
 .../hadoop/hive/registry/ServiceInstance.java   |  18 +-
 .../hive/registry/ServiceInstanceSet.java       |  12 +-
 .../hive/registry/impl/ServiceInstanceBase.java |  57 +++-
 .../hive/registry/impl/TezAmInstance.java       |  21 +-
 .../hive/registry/impl/TezAmRegistryImpl.java   |   8 +-
 .../hive/registry/impl/ZkRegistryBase.java      | 242 +++++++++-----
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |   5 +-
 .../daemon/services/impl/LlapWebServices.java   |   5 +-
 .../tezplugins/LlapTaskSchedulerService.java    |  10 +
 .../hive/ql/exec/tez/TezSessionPoolManager.java |   9 +-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |  11 +-
 .../hive/ql/exec/tez/TezSessionState.java       |   5 +-
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |   3 +-
 .../physical/LlapClusterStateForCompile.java    |   3 +-
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  13 +-
 service/pom.xml                                 |  43 ++-
 .../server/HS2ActivePassiveHARegistry.java      | 325 +++++++++++++++++++
 .../HS2ActivePassiveHARegistryClient.java       |  54 +++
 .../apache/hive/service/server/HiveServer2.java | 313 ++++++++++++------
 .../server/HiveServer2HAInstanceSet.java        |  29 ++
 .../service/server/HiveServer2Instance.java     | 108 ++++++
 .../service/servlet/HS2LeadershipStatus.java    |  48 +++
 .../apache/hive/service/servlet/HS2Peers.java   |  75 +++++
 35 files changed, 1559 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8bbf1be..06efd02 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1916,6 +1916,10 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS),
         "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
         "if a heartbeat is not sent in the timeout."),
+    HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s",
+      new TimeValidator(TimeUnit.SECONDS),
+      "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" +
+        "with exponential backoff is when curator client deems connection is lost to zookeeper."),
     HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
         "The parent node under which all ZooKeeper nodes are created."),
     HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
@@ -2465,6 +2469,13 @@ public class HiveConf extends Configuration {
         "If true, the HiveServer2 WebUI will be secured with PAM."),
 
     // Tez session settings
+    HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false,
+      "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." +
+        "This will also require hive.server2.support.dynamic.service.discovery to be enabled."),
+    HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace",
+      "hs2ActivePassiveHA",
+      "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" +
+        "instances with zookeeper"),
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
         "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
         "workload management is enabled and used for these sessions."),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-minikdc/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml
index 337535a..1e40d9d 100644
--- a/itests/hive-minikdc/pom.xml
+++ b/itests/hive-minikdc/pom.xml
@@ -117,6 +117,26 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index fb31fd4..85a6145 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -53,7 +53,26 @@
       <artifactId>hive-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <!-- dependencies are always listed in sorted order by groupId, artifectId -->
     <!-- test intra-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
new file mode 100644
index 0000000..26acbd7
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.apache.hive.service.servlet.HS2Peers;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestActivePassiveHA {
+  private MiniHS2 miniHS2_1 = null;
+  private MiniHS2 miniHS2_2 = null;
+  private static TestingServer zkServer;
+  private Connection hs2Conn = null;
+  private HiveConf hiveConf1;
+  private HiveConf hiveConf2;
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    MiniHS2.cleanupLocalDir();
+    zkServer = new TestingServer();
+    Class.forName(MiniHS2.getJdbcDriverName());
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (zkServer != null) {
+      zkServer.close();
+      zkServer = null;
+    }
+    MiniHS2.cleanupLocalDir();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    hiveConf1 = new HiveConf();
+    hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    // Set up zookeeper dynamic service discovery configs
+    setHAConfigs(hiveConf1);
+    miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build();
+    hiveConf2 = new HiveConf();
+    hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    // Set up zookeeper dynamic service discovery configs
+    setHAConfigs(hiveConf2);
+    miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (hs2Conn != null) {
+      hs2Conn.close();
+    }
+    if ((miniHS2_1 != null) && miniHS2_1.isStarted()) {
+      miniHS2_1.stop();
+    }
+    if ((miniHS2_2 != null) && miniHS2_2.isStarted()) {
+      miniHS2_2.stop();
+    }
+  }
+
+  private static void setHAConfigs(Configuration conf) {
+    conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true);
+    conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString());
+    final String zkRootNamespace = "hs2test";
+    conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace);
+    conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true);
+    conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS);
+    conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS);
+    conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1);
+  }
+
+  @Test(timeout = 60000)
+  public void testActivePassive() throws Exception {
+    Map<String, String> confOverlay = new HashMap<>();
+    hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_1.start(confOverlay);
+    while(!miniHS2_1.isStarted()) {
+      Thread.sleep(100);
+    }
+
+    hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_2.start(confOverlay);
+    while(!miniHS2_2.isStarted()) {
+      Thread.sleep(100);
+    }
+
+    assertEquals(true, miniHS2_1.isLeader());
+    String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("true", sendGet(url));
+
+    assertEquals(false, miniHS2_2.isLeader());
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("false", sendGet(url));
+
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    String resp = sendGet(url);
+    ObjectMapper objectMapper = new ObjectMapper();
+    HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port1) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+
+    Configuration conf = new Configuration();
+    setHAConfigs(conf);
+    HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(2, hs2Instances.size());
+    List<HiveServer2Instance> leaders = new ArrayList<>();
+    List<HiveServer2Instance> standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(1, standby.size());
+
+    miniHS2_1.stop();
+
+    while(!miniHS2_2.isStarted()) {
+      Thread.sleep(100);
+    }
+    assertEquals(true, miniHS2_2.isLeader());
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("true", sendGet(url));
+
+    while (client.getAll().size() != 1) {
+      Thread.sleep(100);
+    }
+
+    client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(1, hs2Instances.size());
+    leaders = new ArrayList<>();
+    standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(0, standby.size());
+
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    resp = sendGet(url);
+    objectMapper = new ObjectMapper();
+    hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(1, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port2) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+
+    // start 1st server again
+    hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_1.start(confOverlay);
+
+    while(!miniHS2_1.isStarted()) {
+      Thread.sleep(100);
+    }
+    assertEquals(false, miniHS2_1.isLeader());
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("false", sendGet(url));
+
+    while (client.getAll().size() != 2) {
+      Thread.sleep(100);
+    }
+
+    client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(2, hs2Instances.size());
+    leaders = new ArrayList<>();
+    standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(1, standby.size());
+
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    resp = sendGet(url);
+    objectMapper = new ObjectMapper();
+    hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port2) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+  }
+
+  private String sendGet(String url) throws Exception {
+    URL obj = new URL(url);
+    HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+    con.setRequestMethod("GET");
+    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+    String inputLine;
+    StringBuilder response = new StringBuilder();
+    while ((inputLine = in.readLine()) != null) {
+      response.append(inputLine);
+    }
+    in.close();
+    return response.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
index 6cab8cd..d21b764 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
@@ -35,14 +35,16 @@ public abstract class AbstractHiveService {
   private String hostname;
   private int binaryPort;
   private int httpPort;
+  private int webPort;
   private boolean startedHiveService = false;
   private List<String> addedProperties = new ArrayList<String>();
 
-  public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) {
+  public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) {
     this.hiveConf = hiveConf;
     this.hostname = hostname;
     this.binaryPort = binaryPort;
     this.httpPort = httpPort;
+    this.webPort = webPort;
   }
 
   /**
@@ -136,6 +138,10 @@ public abstract class AbstractHiveService {
     return httpPort;
   }
 
+  public int getWebPort() {
+    return webPort;
+  }
+
   public boolean isStarted() {
     return startedHiveService;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 8bbf8a4..997726c 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -217,6 +217,8 @@ public class MiniHS2 extends AbstractHiveService {
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils
             .findFreePort()),
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils
+            .findFreePort()),
+        (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils
             .findFreePort()));
     hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l);
     hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10,
@@ -306,6 +308,7 @@ public class MiniHS2 extends AbstractHiveService {
     hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost());
     hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort());
     hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort());
+    hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort());
 
     Path scratchDir = new Path(baseFsDir, "scratch");
     // Create root scratchdir with write all, so that user impersonation has no issues.
@@ -404,6 +407,10 @@ public class MiniHS2 extends AbstractHiveService {
   }
 
 
+  public boolean isLeader() {
+    return hiveServer2.isLeader();
+  }
+
   public CLIServiceClient getServiceClient() {
     verifyStarted();
     return getServiceClientInternal();

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 5d7f813..6178b4b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,13 +14,16 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.io.IOException;
+
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /**
  * ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
  */
-public interface ServiceRegistry {
+public interface ServiceRegistry<T extends ServiceInstance> {
 
   /**
    * Start the service registry
@@ -49,14 +52,14 @@ public interface ServiceRegistry {
    * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
    *                              started yet. 0 means do not wait.
    */
-  LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
+  ServiceInstanceSet<T> getInstances(String component, long clusterReadyTimeoutMs) throws
+    IOException;
 
   /**
    * Adds state change listeners for service instances.
    * @param listener - state change listener
    */
-  void registerStateChangeListener(
-      ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException;
+  void registerStateChangeListener(ServiceInstanceStateChangeListener<T> listener) throws IOException;
 
   /**
    * @return The application ID of the LLAP cluster.

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index c88198f..f99d86c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -267,8 +268,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
   }
 
   @Override
-  public void registerStateChangeListener(
-      final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) {
+  public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException {
     // nothing to set
     LOG.warn("Callbacks for instance state changes are not supported in fixed registry.");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 80a6aba..3bda40b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -35,7 +36,7 @@ public class LlapRegistryService extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class);
 
-  private ServiceRegistry registry = null;
+  private ServiceRegistry<LlapServiceInstance> registry = null;
   private final boolean isDaemon;
   private boolean isDynamic = false;
   private String identity = "(pending)";
@@ -136,7 +137,7 @@ public class LlapRegistryService extends AbstractService {
   }
 
   public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
-    return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
+    return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
   }
 
   public void registerStateChangeListener(

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8339230..f5d6202 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LlapZookeeperRegistryImpl
-    extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry {
+    extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> {
   private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
 
   /**
@@ -65,8 +65,6 @@ public class LlapZookeeperRegistryImpl
   private static final String IPC_LLAP = "llap";
   private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
   private final static String NAMESPACE_PREFIX = "llap-";
-  private final static String USER_SCOPE_PATH_PREFIX = "user-";
-  private static final String WORKER_PREFIX = "worker-";
   private static final String SLOT_PREFIX = "slot-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
 
@@ -79,7 +77,7 @@ public class LlapZookeeperRegistryImpl
   public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
     super(instanceName, conf,
         HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
-        USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+        USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
         LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null,
         HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE),
@@ -225,7 +223,7 @@ public class LlapZookeeperRegistryImpl
 
     @Override
     public String toString() {
-      return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
+      return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() +
           " with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
           ", servicesAddress=" + getServicesAddress() +  ", mgmtPort=" + getManagementPort() + "]";
     }
@@ -327,9 +325,9 @@ public class LlapZookeeperRegistryImpl
       if (data == null) continue;
       String nodeName = extractNodeName(childData);
       if (nodeName.startsWith(WORKER_PREFIX)) {
-        Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath());
+        LlapServiceInstance instances = getInstanceByPath(childData.getPath());
         if (instances != null) {
-          unsorted.addAll(instances);
+          unsorted.add(instances);
         }
       } else if (nodeName.startsWith(SLOT_PREFIX)) {
         slotByWorker.put(extractWorkerIdFromSlot(childData),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 32d5caa..3208e21 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -55,7 +56,7 @@ public class LlapTokenClient {
   private final SocketFactory socketFactory;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
-  private LlapServiceInstanceSet activeInstances;
+  private ServiceInstanceSet<LlapServiceInstance> activeInstances;
   private Collection<LlapServiceInstance> lastKnownInstances;
   private LlapManagementProtocolClientImpl client;
   private LlapServiceInstance clientInstance;

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
new file mode 100644
index 0000000..e069e43
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+public class RegistryUtilities {
+  private static final String LOCALHOST = "localhost";
+
+  /**
+   * Will return hostname stored in InetAddress.
+   *
+   * @return hostname
+   */
+  public static String getHostName() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      return LOCALHOST;
+    }
+  }
+
+  /**
+   * Will return FQDN of the host after doing reverse DNS lookip.
+   *
+   * @return FQDN of host
+   */
+  public static String getCanonicalHostName() {
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      return LOCALHOST;
+    }
+  }
+
+  public static String getUUID() {
+    return String.valueOf(UUID.randomUUID());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
index 908b3bb..4493e99 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
@@ -21,27 +21,27 @@ public interface ServiceInstance {
    * Worker identity is a UUID (unique across restarts), to identify a node which died &amp; was brought
    * back on the same host/port
    */
-  public abstract String getWorkerIdentity();
+  String getWorkerIdentity();
 
   /**
    * Hostname of the service instance
    * 
-   * @return
+   * @return service hostname
    */
-  public abstract String getHost();
+  String getHost();
 
   /**
    * RPC Endpoint for service instance
-   * 
-   * @return
+   *
+   * @return rpc port
    */
-  public int getRpcPort();
+  int getRpcPort();
 
   /**
    * Config properties of the Service Instance (llap.daemon.*)
-   * 
-   * @return
+   *
+   * @return properties
    */
-  public abstract Map<String, String> getProperties();
+  Map<String, String> getProperties();
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
index 34fba5c..63178cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
@@ -29,15 +29,15 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
    * The worker identity does not collide between restarts, so each restart will have a unique id,
    * while having the same host/ip pair.
    * 
-   * @return
+   * @return instance list
    */
   Collection<InstanceType> getAll();
 
   /**
    * Get an instance by worker identity.
    * 
-   * @param name
-   * @return
+   * @param name worker id
+   * @return instance
    */
   InstanceType getInstance(String name);
 
@@ -46,13 +46,13 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
    * 
    * The list could include dead and alive instances.
    * 
-   * @param host
-   * @return
+   * @param host hostname
+   * @return instance list
    */
   Set<InstanceType> getByHost(String host);
 
   /**
-   * Get number of instances in the currently availabe.
+   * Get number of instances in the currently available.
    *
    * @return - number of instances
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
index db3d788..de8910c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
@@ -15,6 +15,8 @@ package org.apache.hadoop.hive.registry.impl;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
+
 import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
@@ -25,26 +27,27 @@ import org.slf4j.LoggerFactory;
 
 public class ServiceInstanceBase implements ServiceInstance {
   private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class);
+  private String host;
+  private int rpcPort;
+  private String workerIdentity;
+  private Map<String, String> properties;
 
-  protected final ServiceRecord srv;
-  protected final String host;
-  protected final int rpcPort;
+  // empty c'tor to make jackson happy
+  public ServiceInstanceBase() {
 
-  public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
-    this.srv = srv;
+  }
 
+  public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Working with ServiceRecord: {}", srv);
     }
-
     final Endpoint rpc = srv.getInternalEndpoint(rpcName);
-
-    this.host =
-        RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-            AddressTypes.ADDRESS_HOSTNAME_FIELD);
-    this.rpcPort =
-        Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-            AddressTypes.ADDRESS_PORT_FIELD));
+    this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+        AddressTypes.ADDRESS_HOSTNAME_FIELD);
+    this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+        AddressTypes.ADDRESS_PORT_FIELD));
+    this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+    this.properties = srv.attributes();
   }
 
   @Override
@@ -57,17 +60,19 @@ public class ServiceInstanceBase implements ServiceInstance {
     }
 
     ServiceInstanceBase other = (ServiceInstanceBase) o;
-    return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+    return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity())
+      && Objects.equals(host, other.host)
+      && Objects.equals(rpcPort, other.rpcPort);
   }
 
   @Override
   public int hashCode() {
-    return getWorkerIdentity().hashCode();
+    return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort);
   }
 
   @Override
   public String getWorkerIdentity() {
-    return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+    return workerIdentity;
   }
 
   @Override
@@ -82,12 +87,28 @@ public class ServiceInstanceBase implements ServiceInstance {
 
   @Override
   public Map<String, String> getProperties() {
-    return srv.attributes();
+    return properties;
+  }
+
+  public void setHost(final String host) {
+    this.host = host;
+  }
+
+  public void setRpcPort(final int rpcPort) {
+    this.rpcPort = rpcPort;
+  }
+
+  public void setWorkerIdentity(final String workerIdentity) {
+    this.workerIdentity = workerIdentity;
+  }
+
+  public void setProperties(final Map<String, String> properties) {
+    this.properties = properties;
   }
 
   @Override
   public String toString() {
     return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host="
-        + host + ":" + rpcPort + "]";
+      + host + ":" + rpcPort + "]";
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index 0724cf5..d09cb24 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -13,29 +13,26 @@
  */
 package org.apache.hadoop.hive.registry.impl;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
 import org.apache.commons.codec.binary.Base64;
-
-import com.google.common.io.ByteStreams;
-
-import org.apache.tez.common.security.JobTokenIdentifier;
-
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
 
 public class TezAmInstance extends ServiceInstanceBase {
   private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class);
   private final int pluginPort;
   private Token<JobTokenIdentifier> token;
 
-  public TezAmInstance(ServiceRecord srv) throws IOException {
+  TezAmInstance(ServiceRecord srv) throws IOException {
     super(srv, TezAmRegistryImpl.IPC_TEZCLIENT);
     final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN);
     if (plugin != null) {
@@ -76,7 +73,7 @@ public class TezAmInstance extends ServiceInstanceBase {
 
   @Override
   public String toString() {
-    return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort +
+    return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() +
         ", pluginPort=" + pluginPort + ", token=" + token + "]";
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index 417e571..ab02cf4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -37,21 +37,19 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
   static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
       AM_PLUGIN_JOBID = "am.plugin.jobid";
   private final static String NAMESPACE_PREFIX = "tez-am-";
-  private final static String USER_SCOPE_PATH_PREFIX = "user-";
-  private static final String WORKER_PREFIX = "worker-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
 
   private final String registryName;
 
-  public static TezAmRegistryImpl create(Configuration conf, boolean b) {
+  public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
     String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
     return StringUtils.isBlank(amRegistryName) ? null
-        : new TezAmRegistryImpl(amRegistryName, conf, true);
+        : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk);
   }
 
 
   private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) {
-    super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+    super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
         useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null,
         HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL),
         HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 17269dd..e7227a8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -13,13 +13,7 @@
  */
 package org.apache.hadoop.hive.registry.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -34,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.registry.RegistryUtilities;
 import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
@@ -65,6 +63,12 @@ import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This is currently used for implementation inheritance only; it doesn't provide a unified flow
  * into which one can just plug a few abstract method implementations, because providing one with
@@ -77,16 +81,18 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class);
   private final static String SASL_NAMESPACE = "sasl";
   private final static String UNSECURE_NAMESPACE = "unsecure";
-
-  static final String UNIQUE_IDENTIFIER = "registry.unique.id";
-  private static final UUID uniq = UUID.randomUUID();
+  protected final static String USER_SCOPE_PATH_PREFIX = "user-";
+  protected static final String WORKER_PREFIX = "worker-";
+  protected static final String WORKER_GROUP = "workers";
+  public static final String UNIQUE_IDENTIFIER = "registry.unique.id";
+  protected static final UUID UNIQUE_ID = UUID.randomUUID();
+  private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls();
 
   protected final Configuration conf;
   protected final CuratorFramework zooKeeperClient;
-  // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
   // workersPath is the directory path where all the worker znodes are located.
   protected final String workersPath;
-  private final String userPathPrefix, workerNodePrefix;
+  private final String workerNodePrefix;
 
   protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
 
@@ -99,7 +105,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private final String disableMessage;
 
   private final Lock instanceCacheLock = new ReentrantLock();
-  private final Map<String, Set<InstanceType>> pathToInstanceCache;
+  // there can be only one instance per path
+  private final Map<String, InstanceType> pathToInstanceCache;
+  // there can be multiple instances per node
   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
 
   // The registration znode.
@@ -109,29 +117,22 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private PathChildrenCache instancesCache; // Created on demand.
 
   /** Local hostname. */
-  protected static final String hostname;
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
+  protected static final String hostname = RegistryUtilities.getCanonicalHostName();
 
   /**
    * @param rootNs A single root namespace override. Not recommended.
-   * @param nsPrefix The namespace prefix to use with default namespaces.
+   * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure'
+   *                 to namespace prefix to get effective root namespace).
    * @param userScopePathPrefix The prefix to use for the user-specific part of the path.
    * @param workerPrefix The prefix to use for each worker znode.
+   * @param workerGroup group name to use for all workers
    * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed.
    * @param zkPrincipal ZK security principal.
    * @param zkKeytab ZK security keytab.
    * @param aclsConfig A config setting to use to determine if ACLs should be verified.
    */
   public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix,
-      String userScopePathPrefix, String workerPrefix,
+      String userScopePathPrefix, String workerPrefix, String workerGroup,
       String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) {
     this.conf = new Configuration(conf);
     this.saslLoginContextName = zkSaslLoginContextName;
@@ -145,29 +146,52 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       this.disableMessage = "";
     }
     this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    String zkEnsemble = getQuorumServers(this.conf);
     this.encoder = new RegistryUtils.ServiceRecordMarshal();
-    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
-        ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
-        ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
-    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
 
     // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
     // worker-0000000 is the sequence number which will be retained until session timeout. If a
     // worker does not respond due to communication interruptions it will retain the same sequence
     // number when it returns back. If session timeout expires, the node will be deleted and new
     // addition of the same node (restart) will get next sequence number
-    this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf);
-    this.workerNodePrefix = workerPrefix;
-    this.workersPath =  "/" + userPathPrefix + "/" + instanceName + "/workers";
+    final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf);
+    this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix;
+    this.workersPath =  "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup);
     this.instancesCache = null;
     this.stateChangeListeners = new HashSet<>();
     this.pathToInstanceCache = new ConcurrentHashMap<>();
     this.nodeToInstanceCache = new ConcurrentHashMap<>();
+    final String namespace = getRootNamespace(rootNs, nsPrefix);
+    ACLProvider aclProvider;
+    // get acl provider for most outer path that is non-null
+    if (userPathPrefix == null) {
+      if (instanceName == null) {
+        if (workerGroup == null) {
+          aclProvider = getACLProviderForZKPath(namespace);
+        } else {
+          aclProvider = getACLProviderForZKPath(workerGroup);
+        }
+      } else {
+        aclProvider = getACLProviderForZKPath(instanceName);
+      }
+    } else {
+      aclProvider = getACLProviderForZKPath(userScopePathPrefix);
+    }
+    this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider);
+    this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+  }
+
+  public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) {
+    final boolean isSecure = UserGroupInformation.isSecurityEnabled();
+    String rootNs = userProvidedNamespace;
+    if (rootNs == null) {
+      rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE);
+    }
+    return rootNs;
+  }
 
+  private ACLProvider getACLProviderForZKPath(String zkPath) {
     final boolean isSecure = UserGroupInformation.isSecurityEnabled();
-    ACLProvider zooKeeperAclProvider = new ACLProvider() {
+    return new ACLProvider() {
       @Override
       public List<ACL> getDefaultAcl() {
         // We always return something from getAclForPath so this should not happen.
@@ -177,31 +201,40 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
       @Override
       public List<ACL> getAclForPath(String path) {
-        if (!isSecure || path == null || !path.contains(userPathPrefix)) {
+        if (!isSecure || path == null || !path.contains(zkPath)) {
           // No security or the path is below the user path - full access.
           return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
         }
         return createSecureAcls();
       }
     };
-    if (rootNs == null) {
-      rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path.
-    }
+  }
+
+  private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+    String zkEnsemble = getQuorumServers(conf);
+    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
 
     // Create a CuratorFramework instance to be used as the ZooKeeper client
     // Use the zooKeeperAclProvider to create appropriate ACLs
-    this.zooKeeperClient = CuratorFrameworkFactory.builder()
-        .connectString(zkEnsemble)
-        .sessionTimeoutMs(sessionTimeout)
-        .aclProvider(zooKeeperAclProvider)
-        .namespace(rootNs)
-        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
-        .build();
+    return CuratorFrameworkFactory.builder()
+      .connectString(zkEnsemble)
+      .sessionTimeoutMs(sessionTimeout)
+      .connectionTimeoutMs(connectionTimeout)
+      .aclProvider(zooKeeperAclProvider)
+      .namespace(namespace)
+      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+      .build();
   }
 
   private static List<ACL> createSecureAcls() {
     // Read all to the world
-    List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+    List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
     // Create/Delete/Write/Admin to creator
     nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
     return nodeAcls;
@@ -211,9 +244,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
    * Get the ensemble server addresses from the configuration. The format is: host1:port,
    * host2:port..
    *
-   * @param conf
+   * @param conf configuration
    **/
-  private String getQuorumServers(Configuration conf) {
+  private static String getQuorumServers(Configuration conf) {
     String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
     String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
         ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
@@ -238,7 +271,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected final String registerServiceRecord(ServiceRecord srv) throws IOException {
     // restart sensitive instance id
-    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+    srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
 
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
@@ -275,11 +308,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       CloseableUtils.closeQuietly(znode);
       throw (e instanceof IOException) ? (IOException)e : new IOException(e);
     }
-    return uniq.toString();
+    return UNIQUE_ID.toString();
   }
 
+  protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+    try {
+      znode.setData(encoder.toBytes(srv));
+
+      if (doCheckAcls) {
+        try {
+          checkAndSetAcls();
+        } catch (Exception ex) {
+          throw new IOException("Error validating or setting ACLs. " + disableMessage, ex);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to update znode with new service record", e);
+      CloseableUtils.closeQuietly(znode);
+      throw (e instanceof IOException) ? (IOException) e : new IOException(e);
+    }
+  }
 
-  protected final void initializeWithoutRegisteringInternal() throws IOException {
+  final void initializeWithoutRegisteringInternal() throws IOException {
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
       try {
@@ -345,8 +395,8 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private void addToCache(String path, String host, InstanceType instance) {
     instanceCacheLock.lock();
     try {
-      putInCache(path, pathToInstanceCache, instance);
-      putInCache(host, nodeToInstanceCache, instance);
+      putInInstanceCache(path, pathToInstanceCache, instance);
+      putInNodeCache(host, nodeToInstanceCache, instance);
     } finally {
       instanceCacheLock.unlock();
     }
@@ -368,14 +418,19 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
         path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
   }
 
-  private void putInCache(String key, Map<String, Set<InstanceType>> cache,
+  private void putInInstanceCache(String key, Map<String, InstanceType> cache,
       InstanceType instance) {
+    cache.put(key, instance);
+  }
+
+  private void putInNodeCache(String key, Map<String, Set<InstanceType>> cache,
+    InstanceType instance) {
     Set<InstanceType> instanceSet = cache.get(key);
     if (instanceSet == null) {
-      instanceSet = Sets.newHashSet();
-      cache.put(key, instanceSet);
+      instanceSet = new HashSet<>();
+      instanceSet.add(instance);
     }
-    instanceSet.add(instance);
+    cache.put(key, instanceSet);
   }
 
   protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) {
@@ -403,7 +458,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException;
 
-  protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
+  protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
     if (childData == null) return null;
     byte[] data = childData.getData();
     if (data == null) return null;
@@ -415,8 +470,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
     private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
 
     @Override
-    public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
-        throws Exception {
+    public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
       Preconditions.checkArgument(client != null
           && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
 
@@ -427,28 +481,32 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
         if (!nodeName.startsWith(workerNodePrefix)) return;
         LOG.info("{} for zknode {}", event.getType(), childData.getPath());
         InstanceType instance = extractServiceInstance(event, childData);
-        int ephSeqVersion = extractSeqNum(nodeName);
-        switch (event.getType()) {
-        case CHILD_ADDED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onCreate(instance, ephSeqVersion);
-          }
-          break;
-        case CHILD_UPDATED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onUpdate(instance, ephSeqVersion);
-          }
-          break;
-        case CHILD_REMOVED:
-          removeFromCache(childData.getPath(), instance.getHost());
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onRemove(instance, ephSeqVersion);
+        if (instance != null) {
+          int ephSeqVersion = extractSeqNum(nodeName);
+          switch (event.getType()) {
+            case CHILD_ADDED:
+              addToCache(childData.getPath(), instance.getHost(), instance);
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onCreate(instance, ephSeqVersion);
+              }
+              break;
+            case CHILD_UPDATED:
+              addToCache(childData.getPath(), instance.getHost(), instance);
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onUpdate(instance, ephSeqVersion);
+              }
+              break;
+            case CHILD_REMOVED:
+              removeFromCache(childData.getPath(), instance.getHost());
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onRemove(instance, ephSeqVersion);
+              }
+              break;
+            default:
+              // Ignore all the other events; logged above.
           }
-          break;
-        default:
-          // Ignore all the other events; logged above.
+        } else {
+          LOG.info("instance is null for event: {} childData: {}", event.getType(), childData);
         }
       }
     }
@@ -464,7 +522,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected final Set<InstanceType> getByHostInternal(String host) {
     Set<InstanceType> byHost = nodeToInstanceCache.get(host);
-    byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
+    byHost = (byHost == null) ? Sets.newHashSet() : byHost;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
     }
@@ -472,11 +530,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   }
 
   protected final Collection<InstanceType> getAllInternal() {
-    Set<InstanceType> instances =  new HashSet<>();
-    for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
-      instances.addAll(instanceSet);
-    }
-    return instances;
+    return new HashSet<>(pathToInstanceCache.values());
   }
 
   private static String extractNodeName(ChildData childData) {
@@ -564,13 +618,17 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
     CloseableUtils.class.getName();
   }
 
+  protected void unregisterInternal() {
+    CloseableUtils.closeQuietly(znode);
+  }
+
   public void stop() {
     CloseableUtils.closeQuietly(znode);
     CloseableUtils.closeQuietly(instancesCache);
     CloseableUtils.closeQuietly(zooKeeperClient);
   }
 
-  protected final Set<InstanceType> getInstancesByPath(String path) {
+  protected final InstanceType getInstanceByPath(String path) {
     return pathToInstanceCache.get(path);
   }
 
@@ -588,4 +646,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       throw e;
     }
   }
+
+  // for debugging
+  private class ZkConnectionStateListener implements ConnectionStateListener {
+    @Override
+    public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+      LOG.info("Connection state change notification received. State: {}", connectionState);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 0120639..3aec46b 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.NullWritable;
@@ -343,7 +344,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
   private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
     InetAddress address = InetAddress.getByName(host);
-    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
     LlapServiceInstance serviceInstance = null;
 
     // The name used in the service registry may not match the host name we're using.
@@ -375,7 +376,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
 
   private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException {
-    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
     LlapServiceInstance serviceInstance = null;
 
     LOG.info("Finding random live service instance");

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 58bf8dc..b944fad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
@@ -230,7 +232,8 @@ public class LlapWebServices extends AbstractService {
           }
           jg.writeStringField("identity", registry.getWorkerIdentity());
           jg.writeArrayFieldStart("peers");
-          for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) {
+          ServiceInstanceSet<LlapServiceInstance> instanceSet = registry.getInstances();
+          for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) {
             jg.writeStartObject();
             jg.writeStringField("identity", s.getWorkerIdentity());
             jg.writeStringField("host", s.getHost());

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66de3b8..6ddecca 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,16 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 46cfe56..a051f90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -98,13 +98,18 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   protected TezSessionPoolManager() {
   }
 
-  public void startPool() throws Exception {
+  public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception {
     if (defaultSessionPool != null) {
       defaultSessionPool.start();
     }
     if (expirationTracker != null) {
       expirationTracker.start();
     }
+    initTriggers(conf);
+    if (resourcePlan != null) {
+      updateTriggers(resourcePlan);
+      LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
+    }
   }
 
   public void setupPool(HiveConf conf) throws Exception {
@@ -157,8 +162,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
     llapQueue = new Semaphore(numConcurrentLlapQueries, true);
 
-    initTriggers(conf);
-
     String queueAllowedStr = HiveConf.getVar(initConf,
         ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index d1b3fec..d3748ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,25 +18,20 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-import org.apache.hadoop.conf.Configuration;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.security.auth.login.LoginException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 import org.apache.tez.dag.api.TezException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index b98fb58..046ea19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -738,7 +738,10 @@ public class TezSessionState {
   private Path createTezDir(String sessionId, String suffix) throws IOException {
     // tez needs its own scratch dir (per session)
     // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
-    Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
+    SessionState sessionState = SessionState.get();
+    String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState
+      .getHdfsScratchDirURIString();
+    Path tezDir = new Path(hdfsScratchDir, TEZ_DIR);
     tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
     FileSystem fs = tezDir.getFileSystem(conf);
     FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index bc438bb..1b7321b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -52,7 +53,7 @@ public class Utils {
       LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
 
       Collection<LlapServiceInstance> serviceInstances =
-          serviceRegistry.getInstances().getAllInstancesOrdered(true);
+        serviceRegistry.getInstances().getAllInstancesOrdered(true);
       Preconditions.checkArgument(!serviceInstances.isEmpty(),
           "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
       ArrayList<String> locations = new ArrayList<>(serviceInstances.size());

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index a8d729d..0d1990a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +113,7 @@ public class LlapClusterStateForCompile {
           return false; // Don't fail; this is best-effort.
         }
       }
-      LlapServiceInstanceSet instances;
+      ServiceInstanceSet<LlapServiceInstance> instances;
       try {
         instances = svc.getInstances(10);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d261623..d5b683f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.Before;
@@ -90,7 +93,7 @@ public class TestTezSessionPool {
 
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
       // this is now a LIFO operation
 
       // draw 1 and replace
@@ -153,7 +156,7 @@ public class TestTezSessionPool {
 
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
       TezSessionState[] sessions = new TezSessionState[12];
       int[] queueCounts = new int[3];
       for (int i = 0; i < sessions.length; ++i) {
@@ -234,7 +237,7 @@ public class TestTezSessionPool {
       conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2);
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
     } catch (Exception e) {
       LOG.error("Initialization error", e);
       fail();
@@ -295,7 +298,7 @@ public class TestTezSessionPool {
     try {
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
     } catch (Exception e) {
       e.printStackTrace();
       fail();

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index 9ad7555..e3774df 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -264,7 +264,48 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>