You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/03/16 06:01:41 UTC

[2/2] hive git commit: HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)

HIVE-18281: HiveServer2 HA for LLAP and Workload Manager (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21c6a540
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21c6a540
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21c6a540

Branch: refs/heads/master
Commit: 21c6a5407cebc5a096cd9aa10157be05a3ea9627
Parents: 3004335
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu Mar 15 23:01:25 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu Mar 15 23:01:25 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  11 +
 itests/hive-minikdc/pom.xml                     |  20 ++
 itests/hive-unit-hadoop2/pom.xml                |  21 +-
 .../apache/hive/jdbc/TestActivePassiveHA.java   | 268 +++++++++++++++
 .../hive/jdbc/miniHS2/AbstractHiveService.java  |   8 +-
 .../org/apache/hive/jdbc/miniHS2/MiniHS2.java   |   7 +
 .../hive/llap/registry/ServiceRegistry.java     |  11 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |   4 +-
 .../llap/registry/impl/LlapRegistryService.java |   5 +-
 .../impl/LlapZookeeperRegistryImpl.java         |  12 +-
 .../hive/llap/security/LlapTokenClient.java     |   3 +-
 .../hadoop/hive/registry/RegistryUtilities.java |  52 +++
 .../hadoop/hive/registry/ServiceInstance.java   |  18 +-
 .../hive/registry/ServiceInstanceSet.java       |  12 +-
 .../hive/registry/impl/ServiceInstanceBase.java |  57 +++-
 .../hive/registry/impl/TezAmInstance.java       |  21 +-
 .../hive/registry/impl/TezAmRegistryImpl.java   |   8 +-
 .../hive/registry/impl/ZkRegistryBase.java      | 242 +++++++++-----
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |   5 +-
 .../daemon/services/impl/LlapWebServices.java   |   5 +-
 .../tezplugins/LlapTaskSchedulerService.java    |  10 +
 .../hive/ql/exec/tez/TezSessionPoolManager.java |   9 +-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |  11 +-
 .../hive/ql/exec/tez/TezSessionState.java       |   5 +-
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |   3 +-
 .../physical/LlapClusterStateForCompile.java    |   3 +-
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  13 +-
 service/pom.xml                                 |  43 ++-
 .../server/HS2ActivePassiveHARegistry.java      | 325 +++++++++++++++++++
 .../HS2ActivePassiveHARegistryClient.java       |  54 +++
 .../apache/hive/service/server/HiveServer2.java | 313 ++++++++++++------
 .../server/HiveServer2HAInstanceSet.java        |  29 ++
 .../service/server/HiveServer2Instance.java     | 108 ++++++
 .../service/servlet/HS2LeadershipStatus.java    |  48 +++
 .../apache/hive/service/servlet/HS2Peers.java   |  75 +++++
 35 files changed, 1559 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8bbf1be..06efd02 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1916,6 +1916,10 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS),
         "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" +
         "if a heartbeat is not sent in the timeout."),
+    HIVE_ZOOKEEPER_CONNECTION_TIMEOUT("hive.zookeeper.connection.timeout", "15s",
+      new TimeValidator(TimeUnit.SECONDS),
+      "ZooKeeper client's connection timeout in seconds. Connection timeout * hive.zookeeper.connection.max.retries\n" +
+        "with exponential backoff is when curator client deems connection is lost to zookeeper."),
     HIVE_ZOOKEEPER_NAMESPACE("hive.zookeeper.namespace", "hive_zookeeper_namespace",
         "The parent node under which all ZooKeeper nodes are created."),
     HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES("hive.zookeeper.clean.extra.nodes", false,
@@ -2465,6 +2469,13 @@ public class HiveConf extends Configuration {
         "If true, the HiveServer2 WebUI will be secured with PAM."),
 
     // Tez session settings
+    HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false,
+      "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." +
+        "This will also require hive.server2.support.dynamic.service.discovery to be enabled."),
+    HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace",
+      "hs2ActivePassiveHA",
+      "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" +
+        "instances with zookeeper"),
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
         "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
         "workload management is enabled and used for these sessions."),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-minikdc/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-minikdc/pom.xml b/itests/hive-minikdc/pom.xml
index 337535a..1e40d9d 100644
--- a/itests/hive-minikdc/pom.xml
+++ b/itests/hive-minikdc/pom.xml
@@ -117,6 +117,26 @@
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <!-- test inter-project -->
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit-hadoop2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit-hadoop2/pom.xml b/itests/hive-unit-hadoop2/pom.xml
index fb31fd4..85a6145 100644
--- a/itests/hive-unit-hadoop2/pom.xml
+++ b/itests/hive-unit-hadoop2/pom.xml
@@ -53,7 +53,26 @@
       <artifactId>hive-exec</artifactId>
       <version>${project.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <!-- dependencies are always listed in sorted order by groupId, artifectId -->
     <!-- test intra-project -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
new file mode 100644
index 0000000..26acbd7
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.registry.impl.ZkRegistryBase;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistry;
+import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient;
+import org.apache.hive.service.server.HiveServer2Instance;
+import org.apache.hive.service.servlet.HS2Peers;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestActivePassiveHA {
+  private MiniHS2 miniHS2_1 = null;
+  private MiniHS2 miniHS2_2 = null;
+  private static TestingServer zkServer;
+  private Connection hs2Conn = null;
+  private HiveConf hiveConf1;
+  private HiveConf hiveConf2;
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    MiniHS2.cleanupLocalDir();
+    zkServer = new TestingServer();
+    Class.forName(MiniHS2.getJdbcDriverName());
+  }
+
+  @AfterClass
+  public static void afterTest() throws Exception {
+    if (zkServer != null) {
+      zkServer.close();
+      zkServer = null;
+    }
+    MiniHS2.cleanupLocalDir();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    hiveConf1 = new HiveConf();
+    hiveConf1.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    // Set up zookeeper dynamic service discovery configs
+    setHAConfigs(hiveConf1);
+    miniHS2_1 = new MiniHS2.Builder().withConf(hiveConf1).cleanupLocalDirOnStartup(false).build();
+    hiveConf2 = new HiveConf();
+    hiveConf2.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    // Set up zookeeper dynamic service discovery configs
+    setHAConfigs(hiveConf2);
+    miniHS2_2 = new MiniHS2.Builder().withConf(hiveConf2).cleanupLocalDirOnStartup(false).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (hs2Conn != null) {
+      hs2Conn.close();
+    }
+    if ((miniHS2_1 != null) && miniHS2_1.isStarted()) {
+      miniHS2_1.stop();
+    }
+    if ((miniHS2_2 != null) && miniHS2_2.isStarted()) {
+      miniHS2_2.stop();
+    }
+  }
+
+  private static void setHAConfigs(Configuration conf) {
+    conf.setBoolean(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY.varname, true);
+    conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, zkServer.getConnectString());
+    final String zkRootNamespace = "hs2test";
+    conf.set(ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE.varname, zkRootNamespace);
+    conf.setBoolean(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE.varname, true);
+    conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT.varname, 2, TimeUnit.SECONDS);
+    conf.setTimeDuration(ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME.varname, 100, TimeUnit.MILLISECONDS);
+    conf.setInt(ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES.varname, 1);
+  }
+
+  @Test(timeout = 60000)
+  public void testActivePassive() throws Exception {
+    Map<String, String> confOverlay = new HashMap<>();
+    hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_1.start(confOverlay);
+    while(!miniHS2_1.isStarted()) {
+      Thread.sleep(100);
+    }
+
+    hiveConf2.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_2.start(confOverlay);
+    while(!miniHS2_2.isStarted()) {
+      Thread.sleep(100);
+    }
+
+    assertEquals(true, miniHS2_1.isLeader());
+    String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("true", sendGet(url));
+
+    assertEquals(false, miniHS2_2.isLeader());
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("false", sendGet(url));
+
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    String resp = sendGet(url);
+    ObjectMapper objectMapper = new ObjectMapper();
+    HS2Peers.HS2Instances hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    int port1 = Integer.parseInt(hiveConf1.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port1) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+
+    Configuration conf = new Configuration();
+    setHAConfigs(conf);
+    HS2ActivePassiveHARegistry client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    List<HiveServer2Instance> hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(2, hs2Instances.size());
+    List<HiveServer2Instance> leaders = new ArrayList<>();
+    List<HiveServer2Instance> standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(1, standby.size());
+
+    miniHS2_1.stop();
+
+    while(!miniHS2_2.isStarted()) {
+      Thread.sleep(100);
+    }
+    assertEquals(true, miniHS2_2.isLeader());
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("true", sendGet(url));
+
+    while (client.getAll().size() != 1) {
+      Thread.sleep(100);
+    }
+
+    client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(1, hs2Instances.size());
+    leaders = new ArrayList<>();
+    standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(0, standby.size());
+
+    url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    resp = sendGet(url);
+    objectMapper = new ObjectMapper();
+    hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    int port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(1, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port2) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+
+    // start 1st server again
+    hiveConf1.set(ZkRegistryBase.UNIQUE_IDENTIFIER, UUID.randomUUID().toString());
+    miniHS2_1.start(confOverlay);
+
+    while(!miniHS2_1.isStarted()) {
+      Thread.sleep(100);
+    }
+    assertEquals(false, miniHS2_1.isLeader());
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader";
+    assertEquals("false", sendGet(url));
+
+    while (client.getAll().size() != 2) {
+      Thread.sleep(100);
+    }
+
+    client = HS2ActivePassiveHARegistryClient.getClient(conf);
+    hs2Instances = new ArrayList<>(client.getAll());
+    assertEquals(2, hs2Instances.size());
+    leaders = new ArrayList<>();
+    standby = new ArrayList<>();
+    for (HiveServer2Instance instance : hs2Instances) {
+      if (instance.isLeader()) {
+        leaders.add(instance);
+      } else {
+        standby.add(instance);
+      }
+    }
+    assertEquals(1, leaders.size());
+    assertEquals(1, standby.size());
+
+    url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/peers";
+    resp = sendGet(url);
+    objectMapper = new ObjectMapper();
+    hs2Peers = objectMapper.readValue(resp, HS2Peers.HS2Instances.class);
+    port2 = Integer.parseInt(hiveConf2.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname));
+    assertEquals(2, hs2Peers.getHiveServer2Instances().size());
+    for (HiveServer2Instance hsi : hs2Peers.getHiveServer2Instances()) {
+      if (hsi.getRpcPort() == port2) {
+        assertEquals(true, hsi.isLeader());
+      } else {
+        assertEquals(false, hsi.isLeader());
+      }
+    }
+  }
+
+  private String sendGet(String url) throws Exception {
+    URL obj = new URL(url);
+    HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+    con.setRequestMethod("GET");
+    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
+    String inputLine;
+    StringBuilder response = new StringBuilder();
+    while ((inputLine = in.readLine()) != null) {
+      response.append(inputLine);
+    }
+    in.close();
+    return response.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
index 6cab8cd..d21b764 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/AbstractHiveService.java
@@ -35,14 +35,16 @@ public abstract class AbstractHiveService {
   private String hostname;
   private int binaryPort;
   private int httpPort;
+  private int webPort;
   private boolean startedHiveService = false;
   private List<String> addedProperties = new ArrayList<String>();
 
-  public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort) {
+  public AbstractHiveService(HiveConf hiveConf, String hostname, int binaryPort, int httpPort, int webPort) {
     this.hiveConf = hiveConf;
     this.hostname = hostname;
     this.binaryPort = binaryPort;
     this.httpPort = httpPort;
+    this.webPort = webPort;
   }
 
   /**
@@ -136,6 +138,10 @@ public abstract class AbstractHiveService {
     return httpPort;
   }
 
+  public int getWebPort() {
+    return webPort;
+  }
+
   public boolean isStarted() {
     return startedHiveService;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 8bbf8a4..997726c 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -217,6 +217,8 @@ public class MiniHS2 extends AbstractHiveService {
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreTestUtils
             .findFreePort()),
         (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreTestUtils
+            .findFreePort()),
+        (usePortsFromConf ? hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT) : MetaStoreTestUtils
             .findFreePort()));
     hiveConf.setLongVar(ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS, 3l);
     hiveConf.setTimeVar(ConfVars.HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS, 10,
@@ -306,6 +308,7 @@ public class MiniHS2 extends AbstractHiveService {
     hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost());
     hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort());
     hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort());
+    hiveConf.setIntVar(ConfVars.HIVE_SERVER2_WEBUI_PORT, getWebPort());
 
     Path scratchDir = new Path(baseFsDir, "scratch");
     // Create root scratchdir with write all, so that user impersonation has no issues.
@@ -404,6 +407,10 @@ public class MiniHS2 extends AbstractHiveService {
   }
 
 
+  public boolean isLeader() {
+    return hiveServer2.isLeader();
+  }
+
   public CLIServiceClient getServiceClient() {
     verifyStarted();
     return getServiceClientInternal();

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 5d7f813..6178b4b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,13 +14,16 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.io.IOException;
+
+import org.apache.hadoop.hive.registry.ServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /**
  * ServiceRegistry interface for switching between fixed host and dynamic registry implementations.
  */
-public interface ServiceRegistry {
+public interface ServiceRegistry<T extends ServiceInstance> {
 
   /**
    * Start the service registry
@@ -49,14 +52,14 @@ public interface ServiceRegistry {
    * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
    *                              started yet. 0 means do not wait.
    */
-  LlapServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException;
+  ServiceInstanceSet<T> getInstances(String component, long clusterReadyTimeoutMs) throws
+    IOException;
 
   /**
    * Adds state change listeners for service instances.
    * @param listener - state change listener
    */
-  void registerStateChangeListener(
-      ServiceInstanceStateChangeListener<LlapServiceInstance> listener) throws IOException;
+  void registerStateChangeListener(ServiceInstanceStateChangeListener<T> listener) throws IOException;
 
   /**
    * @return The application ID of the LLAP cluster.

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index c88198f..f99d86c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -267,8 +268,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
   }
 
   @Override
-  public void registerStateChangeListener(
-      final ServiceInstanceStateChangeListener<LlapServiceInstance> listener) {
+  public void registerStateChangeListener(final ServiceInstanceStateChangeListener listener) throws IOException {
     // nothing to set
     LOG.warn("Callbacks for instance state changes are not supported in fixed registry.");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 80a6aba..3bda40b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -35,7 +36,7 @@ public class LlapRegistryService extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class);
 
-  private ServiceRegistry registry = null;
+  private ServiceRegistry<LlapServiceInstance> registry = null;
   private final boolean isDaemon;
   private boolean isDynamic = false;
   private String identity = "(pending)";
@@ -136,7 +137,7 @@ public class LlapRegistryService extends AbstractService {
   }
 
   public LlapServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException {
-    return this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
+    return (LlapServiceInstanceSet) this.registry.getInstances("LLAP", clusterReadyTimeoutMs);
   }
 
   public void registerStateChangeListener(

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 8339230..f5d6202 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -53,7 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LlapZookeeperRegistryImpl
-    extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry {
+    extends ZkRegistryBase<LlapServiceInstance> implements ServiceRegistry<LlapServiceInstance> {
   private static final Logger LOG = LoggerFactory.getLogger(LlapZookeeperRegistryImpl.class);
 
   /**
@@ -65,8 +65,6 @@ public class LlapZookeeperRegistryImpl
   private static final String IPC_LLAP = "llap";
   private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
   private final static String NAMESPACE_PREFIX = "llap-";
-  private final static String USER_SCOPE_PATH_PREFIX = "user-";
-  private static final String WORKER_PREFIX = "worker-";
   private static final String SLOT_PREFIX = "slot-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
 
@@ -79,7 +77,7 @@ public class LlapZookeeperRegistryImpl
   public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
     super(instanceName, conf,
         HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
-        USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+        USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
         LlapProxy.isDaemon() ? SASL_LOGIN_CONTEXT_NAME : null,
         HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE),
@@ -225,7 +223,7 @@ public class LlapZookeeperRegistryImpl
 
     @Override
     public String toString() {
-      return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
+      return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + getHost() + ":" + getRpcPort() +
           " with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
           ", servicesAddress=" + getServicesAddress() +  ", mgmtPort=" + getManagementPort() + "]";
     }
@@ -327,9 +325,9 @@ public class LlapZookeeperRegistryImpl
       if (data == null) continue;
       String nodeName = extractNodeName(childData);
       if (nodeName.startsWith(WORKER_PREFIX)) {
-        Set<LlapServiceInstance> instances = getInstancesByPath(childData.getPath());
+        LlapServiceInstance instances = getInstanceByPath(childData.getPath());
         if (instances != null) {
-          unsorted.addAll(instances);
+          unsorted.add(instances);
         }
       } else if (nodeName.startsWith(SLOT_PREFIX)) {
         slotByWorker.put(extractWorkerIdFromSlot(childData),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 32d5caa..3208e21 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -55,7 +56,7 @@ public class LlapTokenClient {
   private final SocketFactory socketFactory;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
-  private LlapServiceInstanceSet activeInstances;
+  private ServiceInstanceSet<LlapServiceInstance> activeInstances;
   private Collection<LlapServiceInstance> lastKnownInstances;
   private LlapManagementProtocolClientImpl client;
   private LlapServiceInstance clientInstance;

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
new file mode 100644
index 0000000..e069e43
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/RegistryUtilities.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+public class RegistryUtilities {
+  private static final String LOCALHOST = "localhost";
+
+  /**
+   * Will return hostname stored in InetAddress.
+   *
+   * @return hostname
+   */
+  public static String getHostName() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      return LOCALHOST;
+    }
+  }
+
+  /**
+   * Will return FQDN of the host after doing reverse DNS lookip.
+   *
+   * @return FQDN of host
+   */
+  public static String getCanonicalHostName() {
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      return LOCALHOST;
+    }
+  }
+
+  public static String getUUID() {
+    return String.valueOf(UUID.randomUUID());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
index 908b3bb..4493e99 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstance.java
@@ -21,27 +21,27 @@ public interface ServiceInstance {
    * Worker identity is a UUID (unique across restarts), to identify a node which died &amp; was brought
    * back on the same host/port
    */
-  public abstract String getWorkerIdentity();
+  String getWorkerIdentity();
 
   /**
    * Hostname of the service instance
    * 
-   * @return
+   * @return service hostname
    */
-  public abstract String getHost();
+  String getHost();
 
   /**
    * RPC Endpoint for service instance
-   * 
-   * @return
+   *
+   * @return rpc port
    */
-  public int getRpcPort();
+  int getRpcPort();
 
   /**
    * Config properties of the Service Instance (llap.daemon.*)
-   * 
-   * @return
+   *
+   * @return properties
    */
-  public abstract Map<String, String> getProperties();
+  Map<String, String> getProperties();
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
index 34fba5c..63178cc 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceSet.java
@@ -29,15 +29,15 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
    * The worker identity does not collide between restarts, so each restart will have a unique id,
    * while having the same host/ip pair.
    * 
-   * @return
+   * @return instance list
    */
   Collection<InstanceType> getAll();
 
   /**
    * Get an instance by worker identity.
    * 
-   * @param name
-   * @return
+   * @param name worker id
+   * @return instance
    */
   InstanceType getInstance(String name);
 
@@ -46,13 +46,13 @@ public interface ServiceInstanceSet<InstanceType extends ServiceInstance> {
    * 
    * The list could include dead and alive instances.
    * 
-   * @param host
-   * @return
+   * @param host hostname
+   * @return instance list
    */
   Set<InstanceType> getByHost(String host);
 
   /**
-   * Get number of instances in the currently availabe.
+   * Get number of instances in the currently available.
    *
    * @return - number of instances
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
index db3d788..de8910c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ServiceInstanceBase.java
@@ -15,6 +15,8 @@ package org.apache.hadoop.hive.registry.impl;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
+
 import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
@@ -25,26 +27,27 @@ import org.slf4j.LoggerFactory;
 
 public class ServiceInstanceBase implements ServiceInstance {
   private static final Logger LOG = LoggerFactory.getLogger(ServiceInstanceBase.class);
+  private String host;
+  private int rpcPort;
+  private String workerIdentity;
+  private Map<String, String> properties;
 
-  protected final ServiceRecord srv;
-  protected final String host;
-  protected final int rpcPort;
+  // empty c'tor to make jackson happy
+  public ServiceInstanceBase() {
 
-  public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
-    this.srv = srv;
+  }
 
+  public ServiceInstanceBase(ServiceRecord srv, String rpcName) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Working with ServiceRecord: {}", srv);
     }
-
     final Endpoint rpc = srv.getInternalEndpoint(rpcName);
-
-    this.host =
-        RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-            AddressTypes.ADDRESS_HOSTNAME_FIELD);
-    this.rpcPort =
-        Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
-            AddressTypes.ADDRESS_PORT_FIELD));
+    this.host = RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+        AddressTypes.ADDRESS_HOSTNAME_FIELD);
+    this.rpcPort = Integer.parseInt(RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
+        AddressTypes.ADDRESS_PORT_FIELD));
+    this.workerIdentity = srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+    this.properties = srv.attributes();
   }
 
   @Override
@@ -57,17 +60,19 @@ public class ServiceInstanceBase implements ServiceInstance {
     }
 
     ServiceInstanceBase other = (ServiceInstanceBase) o;
-    return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+    return Objects.equals(getWorkerIdentity(), other.getWorkerIdentity())
+      && Objects.equals(host, other.host)
+      && Objects.equals(rpcPort, other.rpcPort);
   }
 
   @Override
   public int hashCode() {
-    return getWorkerIdentity().hashCode();
+    return getWorkerIdentity().hashCode() + (31 * host.hashCode()) + (31 * rpcPort);
   }
 
   @Override
   public String getWorkerIdentity() {
-    return srv.get(ZkRegistryBase.UNIQUE_IDENTIFIER);
+    return workerIdentity;
   }
 
   @Override
@@ -82,12 +87,28 @@ public class ServiceInstanceBase implements ServiceInstance {
 
   @Override
   public Map<String, String> getProperties() {
-    return srv.attributes();
+    return properties;
+  }
+
+  public void setHost(final String host) {
+    this.host = host;
+  }
+
+  public void setRpcPort(final int rpcPort) {
+    this.rpcPort = rpcPort;
+  }
+
+  public void setWorkerIdentity(final String workerIdentity) {
+    this.workerIdentity = workerIdentity;
+  }
+
+  public void setProperties(final Map<String, String> properties) {
+    this.properties = properties;
   }
 
   @Override
   public String toString() {
     return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host="
-        + host + ":" + rpcPort + "]";
+      + host + ":" + rpcPort + "]";
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
index 0724cf5..d09cb24 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -13,29 +13,26 @@
  */
 package org.apache.hadoop.hive.registry.impl;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
 import org.apache.commons.codec.binary.Base64;
-
-import com.google.common.io.ByteStreams;
-
-import org.apache.tez.common.security.JobTokenIdentifier;
-
-import org.apache.hadoop.security.token.Token;
-
-import java.io.IOException;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.types.AddressTypes;
 import org.apache.hadoop.registry.client.types.Endpoint;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.ByteStreams;
 
 public class TezAmInstance extends ServiceInstanceBase {
   private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class);
   private final int pluginPort;
   private Token<JobTokenIdentifier> token;
 
-  public TezAmInstance(ServiceRecord srv) throws IOException {
+  TezAmInstance(ServiceRecord srv) throws IOException {
     super(srv, TezAmRegistryImpl.IPC_TEZCLIENT);
     final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN);
     if (plugin != null) {
@@ -76,7 +73,7 @@ public class TezAmInstance extends ServiceInstanceBase {
 
   @Override
   public String toString() {
-    return "TezAmInstance [" + getSessionId() + ", host=" + host + ", rpcPort=" + rpcPort +
+    return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() +
         ", pluginPort=" + pluginPort + ", token=" + token + "]";
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
index 417e571..ab02cf4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -37,21 +37,19 @@ public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
   static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token",
       AM_PLUGIN_JOBID = "am.plugin.jobid";
   private final static String NAMESPACE_PREFIX = "tez-am-";
-  private final static String USER_SCOPE_PATH_PREFIX = "user-";
-  private static final String WORKER_PREFIX = "worker-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
 
   private final String registryName;
 
-  public static TezAmRegistryImpl create(Configuration conf, boolean b) {
+  public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) {
     String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
     return StringUtils.isBlank(amRegistryName) ? null
-        : new TezAmRegistryImpl(amRegistryName, conf, true);
+        : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk);
   }
 
 
   private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) {
-    super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX,
+    super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, WORKER_GROUP,
         useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null,
         HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL),
         HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE),

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index 17269dd..e7227a8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -13,13 +13,7 @@
  */
 package org.apache.hadoop.hive.registry.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -34,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -44,12 +39,15 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.registry.RegistryUtilities;
 import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
@@ -65,6 +63,12 @@ import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This is currently used for implementation inheritance only; it doesn't provide a unified flow
  * into which one can just plug a few abstract method implementations, because providing one with
@@ -77,16 +81,18 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private static final Logger LOG = LoggerFactory.getLogger(ZkRegistryBase.class);
   private final static String SASL_NAMESPACE = "sasl";
   private final static String UNSECURE_NAMESPACE = "unsecure";
-
-  static final String UNIQUE_IDENTIFIER = "registry.unique.id";
-  private static final UUID uniq = UUID.randomUUID();
+  protected final static String USER_SCOPE_PATH_PREFIX = "user-";
+  protected static final String WORKER_PREFIX = "worker-";
+  protected static final String WORKER_GROUP = "workers";
+  public static final String UNIQUE_IDENTIFIER = "registry.unique.id";
+  protected static final UUID UNIQUE_ID = UUID.randomUUID();
+  private static final Joiner PATH_JOINER = Joiner.on("/").skipNulls();
 
   protected final Configuration conf;
   protected final CuratorFramework zooKeeperClient;
-  // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
   // workersPath is the directory path where all the worker znodes are located.
   protected final String workersPath;
-  private final String userPathPrefix, workerNodePrefix;
+  private final String workerNodePrefix;
 
   protected final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
 
@@ -99,7 +105,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private final String disableMessage;
 
   private final Lock instanceCacheLock = new ReentrantLock();
-  private final Map<String, Set<InstanceType>> pathToInstanceCache;
+  // there can be only one instance per path
+  private final Map<String, InstanceType> pathToInstanceCache;
+  // there can be multiple instances per node
   private final Map<String, Set<InstanceType>> nodeToInstanceCache;
 
   // The registration znode.
@@ -109,29 +117,22 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private PathChildrenCache instancesCache; // Created on demand.
 
   /** Local hostname. */
-  protected static final String hostname;
-  static {
-    String localhost = "localhost";
-    try {
-      localhost = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException uhe) {
-      // ignore
-    }
-    hostname = localhost;
-  }
+  protected static final String hostname = RegistryUtilities.getCanonicalHostName();
 
   /**
    * @param rootNs A single root namespace override. Not recommended.
-   * @param nsPrefix The namespace prefix to use with default namespaces.
+   * @param nsPrefix The namespace prefix to use with default namespaces (appends 'sasl' for secure else 'unsecure'
+   *                 to namespace prefix to get effective root namespace).
    * @param userScopePathPrefix The prefix to use for the user-specific part of the path.
    * @param workerPrefix The prefix to use for each worker znode.
+   * @param workerGroup group name to use for all workers
    * @param zkSaslLoginContextName SASL login context name for ZK security; null if not needed.
    * @param zkPrincipal ZK security principal.
    * @param zkKeytab ZK security keytab.
    * @param aclsConfig A config setting to use to determine if ACLs should be verified.
    */
   public ZkRegistryBase(String instanceName, Configuration conf, String rootNs, String nsPrefix,
-      String userScopePathPrefix, String workerPrefix,
+      String userScopePathPrefix, String workerPrefix, String workerGroup,
       String zkSaslLoginContextName, String zkPrincipal, String zkKeytab, ConfVars aclsConfig) {
     this.conf = new Configuration(conf);
     this.saslLoginContextName = zkSaslLoginContextName;
@@ -145,29 +146,52 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       this.disableMessage = "";
     }
     this.conf.addResource(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
-    String zkEnsemble = getQuorumServers(this.conf);
     this.encoder = new RegistryUtils.ServiceRecordMarshal();
-    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
-        ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
-        ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
-    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
 
     // sample path: /llap-sasl/hiveuser/hostname/workers/worker-0000000
     // worker-0000000 is the sequence number which will be retained until session timeout. If a
     // worker does not respond due to communication interruptions it will retain the same sequence
     // number when it returns back. If session timeout expires, the node will be deleted and new
     // addition of the same node (restart) will get next sequence number
-    this.userPathPrefix = userScopePathPrefix + getZkPathUser(this.conf);
-    this.workerNodePrefix = workerPrefix;
-    this.workersPath =  "/" + userPathPrefix + "/" + instanceName + "/workers";
+    final String userPathPrefix = userScopePathPrefix == null ? null : userScopePathPrefix + getZkPathUser(conf);
+    this.workerNodePrefix = workerPrefix == null ? WORKER_PREFIX : workerPrefix;
+    this.workersPath =  "/" + PATH_JOINER.join(userPathPrefix, instanceName, workerGroup);
     this.instancesCache = null;
     this.stateChangeListeners = new HashSet<>();
     this.pathToInstanceCache = new ConcurrentHashMap<>();
     this.nodeToInstanceCache = new ConcurrentHashMap<>();
+    final String namespace = getRootNamespace(rootNs, nsPrefix);
+    ACLProvider aclProvider;
+    // get acl provider for most outer path that is non-null
+    if (userPathPrefix == null) {
+      if (instanceName == null) {
+        if (workerGroup == null) {
+          aclProvider = getACLProviderForZKPath(namespace);
+        } else {
+          aclProvider = getACLProviderForZKPath(workerGroup);
+        }
+      } else {
+        aclProvider = getACLProviderForZKPath(instanceName);
+      }
+    } else {
+      aclProvider = getACLProviderForZKPath(userScopePathPrefix);
+    }
+    this.zooKeeperClient = getZookeeperClient(conf, namespace, aclProvider);
+    this.zooKeeperClient.getConnectionStateListenable().addListener(new ZkConnectionStateListener());
+  }
+
+  public static String getRootNamespace(String userProvidedNamespace, String defaultNamespacePrefix) {
+    final boolean isSecure = UserGroupInformation.isSecurityEnabled();
+    String rootNs = userProvidedNamespace;
+    if (rootNs == null) {
+      rootNs = defaultNamespacePrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE);
+    }
+    return rootNs;
+  }
 
+  private ACLProvider getACLProviderForZKPath(String zkPath) {
     final boolean isSecure = UserGroupInformation.isSecurityEnabled();
-    ACLProvider zooKeeperAclProvider = new ACLProvider() {
+    return new ACLProvider() {
       @Override
       public List<ACL> getDefaultAcl() {
         // We always return something from getAclForPath so this should not happen.
@@ -177,31 +201,40 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
       @Override
       public List<ACL> getAclForPath(String path) {
-        if (!isSecure || path == null || !path.contains(userPathPrefix)) {
+        if (!isSecure || path == null || !path.contains(zkPath)) {
           // No security or the path is below the user path - full access.
           return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
         }
         return createSecureAcls();
       }
     };
-    if (rootNs == null) {
-      rootNs = nsPrefix + (isSecure ? SASL_NAMESPACE : UNSECURE_NAMESPACE); // The normal path.
-    }
+  }
+
+  private CuratorFramework getZookeeperClient(Configuration conf, String namespace, ACLProvider zooKeeperAclProvider) {
+    String zkEnsemble = getQuorumServers(conf);
+    int sessionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int connectionTimeout = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int baseSleepTime = (int) HiveConf.getTimeVar(conf,
+      ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS);
+    int maxRetries = HiveConf.getIntVar(conf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
 
     // Create a CuratorFramework instance to be used as the ZooKeeper client
     // Use the zooKeeperAclProvider to create appropriate ACLs
-    this.zooKeeperClient = CuratorFrameworkFactory.builder()
-        .connectString(zkEnsemble)
-        .sessionTimeoutMs(sessionTimeout)
-        .aclProvider(zooKeeperAclProvider)
-        .namespace(rootNs)
-        .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
-        .build();
+    return CuratorFrameworkFactory.builder()
+      .connectString(zkEnsemble)
+      .sessionTimeoutMs(sessionTimeout)
+      .connectionTimeoutMs(connectionTimeout)
+      .aclProvider(zooKeeperAclProvider)
+      .namespace(namespace)
+      .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries))
+      .build();
   }
 
   private static List<ACL> createSecureAcls() {
     // Read all to the world
-    List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+    List<ACL> nodeAcls = new ArrayList<>(ZooDefs.Ids.READ_ACL_UNSAFE);
     // Create/Delete/Write/Admin to creator
     nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
     return nodeAcls;
@@ -211,9 +244,9 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
    * Get the ensemble server addresses from the configuration. The format is: host1:port,
    * host2:port..
    *
-   * @param conf
+   * @param conf configuration
    **/
-  private String getQuorumServers(Configuration conf) {
+  private static String getQuorumServers(Configuration conf) {
     String[] hosts = conf.getTrimmedStrings(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname);
     String port = conf.get(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname,
         ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.getDefaultValue());
@@ -238,7 +271,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected final String registerServiceRecord(ServiceRecord srv) throws IOException {
     // restart sensitive instance id
-    srv.set(UNIQUE_IDENTIFIER, uniq.toString());
+    srv.set(UNIQUE_IDENTIFIER, UNIQUE_ID.toString());
 
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
@@ -275,11 +308,28 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       CloseableUtils.closeQuietly(znode);
       throw (e instanceof IOException) ? (IOException)e : new IOException(e);
     }
-    return uniq.toString();
+    return UNIQUE_ID.toString();
   }
 
+  protected final void updateServiceRecord(ServiceRecord srv) throws IOException {
+    try {
+      znode.setData(encoder.toBytes(srv));
+
+      if (doCheckAcls) {
+        try {
+          checkAndSetAcls();
+        } catch (Exception ex) {
+          throw new IOException("Error validating or setting ACLs. " + disableMessage, ex);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to update znode with new service record", e);
+      CloseableUtils.closeQuietly(znode);
+      throw (e instanceof IOException) ? (IOException) e : new IOException(e);
+    }
+  }
 
-  protected final void initializeWithoutRegisteringInternal() throws IOException {
+  final void initializeWithoutRegisteringInternal() throws IOException {
     // Create a znode under the rootNamespace parent for this instance of the server
     try {
       try {
@@ -345,8 +395,8 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   private void addToCache(String path, String host, InstanceType instance) {
     instanceCacheLock.lock();
     try {
-      putInCache(path, pathToInstanceCache, instance);
-      putInCache(host, nodeToInstanceCache, instance);
+      putInInstanceCache(path, pathToInstanceCache, instance);
+      putInNodeCache(host, nodeToInstanceCache, instance);
     } finally {
       instanceCacheLock.unlock();
     }
@@ -368,14 +418,19 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
         path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
   }
 
-  private void putInCache(String key, Map<String, Set<InstanceType>> cache,
+  private void putInInstanceCache(String key, Map<String, InstanceType> cache,
       InstanceType instance) {
+    cache.put(key, instance);
+  }
+
+  private void putInNodeCache(String key, Map<String, Set<InstanceType>> cache,
+    InstanceType instance) {
     Set<InstanceType> instanceSet = cache.get(key);
     if (instanceSet == null) {
-      instanceSet = Sets.newHashSet();
-      cache.put(key, instanceSet);
+      instanceSet = new HashSet<>();
+      instanceSet.add(instance);
     }
-    instanceSet.add(instance);
+    cache.put(key, instanceSet);
   }
 
   protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) {
@@ -403,7 +458,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected abstract InstanceType createServiceInstance(ServiceRecord srv) throws IOException;
 
-  protected static final byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
+  protected static byte[] getWorkerData(ChildData childData, String workerNodePrefix) {
     if (childData == null) return null;
     byte[] data = childData.getData();
     if (data == null) return null;
@@ -415,8 +470,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
     private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
 
     @Override
-    public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
-        throws Exception {
+    public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) {
       Preconditions.checkArgument(client != null
           && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
 
@@ -427,28 +481,32 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
         if (!nodeName.startsWith(workerNodePrefix)) return;
         LOG.info("{} for zknode {}", event.getType(), childData.getPath());
         InstanceType instance = extractServiceInstance(event, childData);
-        int ephSeqVersion = extractSeqNum(nodeName);
-        switch (event.getType()) {
-        case CHILD_ADDED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onCreate(instance, ephSeqVersion);
-          }
-          break;
-        case CHILD_UPDATED:
-          addToCache(childData.getPath(), instance.getHost(), instance);
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onUpdate(instance, ephSeqVersion);
-          }
-          break;
-        case CHILD_REMOVED:
-          removeFromCache(childData.getPath(), instance.getHost());
-          for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
-            listener.onRemove(instance, ephSeqVersion);
+        if (instance != null) {
+          int ephSeqVersion = extractSeqNum(nodeName);
+          switch (event.getType()) {
+            case CHILD_ADDED:
+              addToCache(childData.getPath(), instance.getHost(), instance);
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onCreate(instance, ephSeqVersion);
+              }
+              break;
+            case CHILD_UPDATED:
+              addToCache(childData.getPath(), instance.getHost(), instance);
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onUpdate(instance, ephSeqVersion);
+              }
+              break;
+            case CHILD_REMOVED:
+              removeFromCache(childData.getPath(), instance.getHost());
+              for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) {
+                listener.onRemove(instance, ephSeqVersion);
+              }
+              break;
+            default:
+              // Ignore all the other events; logged above.
           }
-          break;
-        default:
-          // Ignore all the other events; logged above.
+        } else {
+          LOG.info("instance is null for event: {} childData: {}", event.getType(), childData);
         }
       }
     }
@@ -464,7 +522,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
 
   protected final Set<InstanceType> getByHostInternal(String host) {
     Set<InstanceType> byHost = nodeToInstanceCache.get(host);
-    byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
+    byHost = (byHost == null) ? Sets.newHashSet() : byHost;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
     }
@@ -472,11 +530,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
   }
 
   protected final Collection<InstanceType> getAllInternal() {
-    Set<InstanceType> instances =  new HashSet<>();
-    for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
-      instances.addAll(instanceSet);
-    }
-    return instances;
+    return new HashSet<>(pathToInstanceCache.values());
   }
 
   private static String extractNodeName(ChildData childData) {
@@ -564,13 +618,17 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
     CloseableUtils.class.getName();
   }
 
+  protected void unregisterInternal() {
+    CloseableUtils.closeQuietly(znode);
+  }
+
   public void stop() {
     CloseableUtils.closeQuietly(znode);
     CloseableUtils.closeQuietly(instancesCache);
     CloseableUtils.closeQuietly(zooKeeperClient);
   }
 
-  protected final Set<InstanceType> getInstancesByPath(String path) {
+  protected final InstanceType getInstanceByPath(String path) {
     return pathToInstanceCache.get(path);
   }
 
@@ -588,4 +646,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> {
       throw e;
     }
   }
+
+  // for debugging
+  private class ZkConnectionStateListener implements ConnectionStateListener {
+    @Override
+    public void stateChanged(final CuratorFramework curatorFramework, final ConnectionState connectionState) {
+      LOG.info("Connection state change notification received. State: {}", connectionState);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 0120639..3aec46b 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.NullWritable;
@@ -343,7 +344,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
   private LlapServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
     InetAddress address = InetAddress.getByName(host);
-    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
     LlapServiceInstance serviceInstance = null;
 
     // The name used in the service registry may not match the host name we're using.
@@ -375,7 +376,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
 
   private LlapServiceInstance getServiceInstanceRandom(LlapRegistryService registryService) throws IOException {
-    LlapServiceInstanceSet instanceSet = registryService.getInstances();
+    ServiceInstanceSet<LlapServiceInstance> instanceSet = registryService.getInstances();
     LlapServiceInstance serviceInstance = null;
 
     LOG.info("Finding random live service instance");

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index 58bf8dc..b944fad 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
@@ -230,7 +232,8 @@ public class LlapWebServices extends AbstractService {
           }
           jg.writeStringField("identity", registry.getWorkerIdentity());
           jg.writeArrayFieldStart("peers");
-          for (LlapServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) {
+          ServiceInstanceSet<LlapServiceInstance> instanceSet = registry.getInstances();
+          for (LlapServiceInstance s : ((LlapServiceInstanceSet) instanceSet).getAllInstancesOrdered(false)) {
             jg.writeStartObject();
             jg.writeStringField("identity", s.getWorkerIdentity());
             jg.writeStringField("host", s.getHost());

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 66de3b8..6ddecca 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,16 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 46cfe56..a051f90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -98,13 +98,18 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   protected TezSessionPoolManager() {
   }
 
-  public void startPool() throws Exception {
+  public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception {
     if (defaultSessionPool != null) {
       defaultSessionPool.start();
     }
     if (expirationTracker != null) {
       expirationTracker.start();
     }
+    initTriggers(conf);
+    if (resourcePlan != null) {
+      updateTriggers(resourcePlan);
+      LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName());
+    }
   }
 
   public void setupPool(HiveConf conf) throws Exception {
@@ -157,8 +162,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
     llapQueue = new Semaphore(numConcurrentLlapQueries, true);
 
-    initTriggers(conf);
-
     String queueAllowedStr = HiveConf.getVar(initConf,
         ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED);
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index d1b3fec..d3748ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,25 +18,20 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.util.concurrent.SettableFuture;
-import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-import org.apache.hadoop.conf.Configuration;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Collection;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.security.auth.login.LoginException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
-import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 import org.apache.tez.dag.api.TezException;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index b98fb58..046ea19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -738,7 +738,10 @@ public class TezSessionState {
   private Path createTezDir(String sessionId, String suffix) throws IOException {
     // tez needs its own scratch dir (per session)
     // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
-    Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
+    SessionState sessionState = SessionState.get();
+    String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState
+      .getHdfsScratchDirURIString();
+    Path tezDir = new Path(hdfsScratchDir, TEZ_DIR);
     tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
     FileSystem fs = tezDir.getFileSystem(conf);
     FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index bc438bb..1b7321b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -52,7 +53,7 @@ public class Utils {
       LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
 
       Collection<LlapServiceInstance> serviceInstances =
-          serviceRegistry.getInstances().getAllInstancesOrdered(true);
+        serviceRegistry.getInstances().getAllInstancesOrdered(true);
       Preconditions.checkArgument(!serviceInstances.isEmpty(),
           "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
       ArrayList<String> locations = new ArrayList<>(serviceInstances.size());

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index a8d729d..0d1990a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +113,7 @@ public class LlapClusterStateForCompile {
           return false; // Don't fail; this is best-effort.
         }
       }
-      LlapServiceInstanceSet instances;
+      ServiceInstanceSet<LlapServiceInstance> instances;
       try {
         instances = svc.getInstances(10);
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d261623..d5b683f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.Before;
@@ -90,7 +93,7 @@ public class TestTezSessionPool {
 
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
       // this is now a LIFO operation
 
       // draw 1 and replace
@@ -153,7 +156,7 @@ public class TestTezSessionPool {
 
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
       TezSessionState[] sessions = new TezSessionState[12];
       int[] queueCounts = new int[3];
       for (int i = 0; i < sessions.length; ++i) {
@@ -234,7 +237,7 @@ public class TestTezSessionPool {
       conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2);
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
     } catch (Exception e) {
       LOG.error("Initialization error", e);
       fail();
@@ -295,7 +298,7 @@ public class TestTezSessionPool {
     try {
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
-      poolManager.startPool();
+      poolManager.startPool(conf, null);
     } catch (Exception e) {
       e.printStackTrace();
       fail();

http://git-wip-us.apache.org/repos/asf/hive/blob/21c6a540/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index 9ad7555..e3774df 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -264,7 +264,48 @@
       <version>${junit.version}</version>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-library</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${tez.version}</version>
+      <scope>test</scope>
+        <exclusions>
+             <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>