You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/02 18:22:11 UTC

hive git commit: HIVE-14589 : add consistent node replacement to LLAP for splits (Sergey Shelukhin, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master fa1d8c760 -> 9365bec58


HIVE-14589 : add consistent node replacement to LLAP for splits (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9365bec5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9365bec5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9365bec5

Branch: refs/heads/master
Commit: 9365bec584b5a9fbe30f7747b527ca7353dd878e
Parents: fa1d8c7
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Sep 2 11:20:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Sep 2 11:21:32 2016 -0700

----------------------------------------------------------------------
 llap-client/pom.xml                             |  22 ++
 .../hive/llap/registry/ServiceInstanceSet.java  |   7 +-
 .../registry/impl/LlapFixedRegistryImpl.java    |   5 +-
 .../impl/LlapZookeeperRegistryImpl.java         | 220 +++++++-----
 .../hive/llap/registry/impl/SlotZnode.java      | 334 +++++++++++++++++++
 .../hive/llap/security/LlapTokenClient.java     |   4 +-
 .../hive/llap/registry/impl/TestSlotZnode.java  | 269 +++++++++++++++
 .../hive/llap/cli/LlapStatusServiceDriver.java  |  21 +-
 .../tezplugins/LlapTaskSchedulerService.java    |  15 +-
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |   4 +-
 .../TestHostAffinitySplitLocationProvider.java  |   4 +-
 11 files changed, 790 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index 0243340..3bacd2b 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -46,6 +46,28 @@
     </dependency>
     <!-- inter-project -->
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
+    <groupId>org.apache.curator</groupId>
+      <artifactId>apache-curator</artifactId>
+      <version>${curator.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>${curator.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <version>${commons-lang3.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 99ead9b..13b668d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -13,9 +13,8 @@
  */
 package org.apache.hadoop.hive.llap.registry;
 
-import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public interface ServiceInstanceSet {
@@ -28,14 +27,14 @@ public interface ServiceInstanceSet {
    * 
    * @return
    */
-  public Map<String, ServiceInstance> getAll();
+  public Collection<ServiceInstance> getAll();
 
   /**
    * Gets a list containing all the instances. This list has the same iteration order across
    * different processes, assuming the list of registry entries is the same.
    * @return
    */
-  public List<ServiceInstance> getAllInstancesOrdered();
+  public Collection<ServiceInstance> getAllInstancesOrdered();
 
   /**
    * Get an instance by worker identity.

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index e9456f2..de4d7f2 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -222,8 +223,8 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     }
 
     @Override
-    public Map<String, ServiceInstance> getAll() {
-      return instances;
+    public Collection<ServiceInstance> getAll() {
+      return instances.values();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 64d2617..5e17ebf 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -21,15 +21,18 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +48,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -97,13 +101,19 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   private final static String USER_SCOPE_PATH_PREFIX = "user-";
   private static final String DISABLE_MESSAGE =
       "Set " + ConfVars.LLAP_VALIDATE_ACLS.varname + " to false to disable ACL validation";
+  private static final String WORKER_PREFIX = "worker-";
+  private static final String SLOT_PREFIX = "slot-";
 
   private final Configuration conf;
   private final CuratorFramework zooKeeperClient;
-  private final String pathPrefix, userPathPrefix;
+  // userPathPrefix is the path specific to the user for which ACLs should be restrictive.
+  // workersPath is the directory path where all the worker znodes are located.
+  private final String userPathPrefix, workersPath;
   private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
 
   private PersistentEphemeralNode znode;
+
+  private SlotZnode slotZnode;
   private String znodePath; // unique identity for this instance
   private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
 
@@ -147,7 +157,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     // number when it returns back. If session timeout expires, the node will be deleted and new
     // addition of the same node (restart) will get next sequence number
     this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
-    this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
+    this.workersPath =  "/" + userPathPrefix + "/" + instanceName + "/workers";
     this.instancesCache = null;
     this.instances = null;
     this.stateChangeListeners = new HashSet<>();
@@ -292,10 +302,10 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     try {
       // PersistentEphemeralNode will make sure the ephemeral node created on server will be present
       // even under connection or session interruption (will automatically handle retries)
-      znode = new PersistentEphemeralNode(zooKeeperClient,
-          PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, encoder.toBytes(srv));
+      znode = new PersistentEphemeralNode(zooKeeperClient, Mode.EPHEMERAL_SEQUENTIAL,
+          workersPath + "/" + WORKER_PREFIX, encoder.toBytes(srv));
 
-      // start the creation of znode
+      // start the creation of znodes
       znode.start();
 
       // We'll wait for 120s for node creation
@@ -306,6 +316,14 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       }
 
       znodePath = znode.getActualPath();
+
+      slotZnode = new SlotZnode(
+          zooKeeperClient, workersPath, SLOT_PREFIX, WORKER_PREFIX, uniq.toString());
+      if (!slotZnode.start(znodeCreationTimeout, TimeUnit.SECONDS)) {
+        throw new Exception(
+            "Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
+      }
+
       if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) {
         try {
           checkAndSetAcls();
@@ -313,7 +331,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
           throw new IOException("Error validating or setting ACLs. " + DISABLE_MESSAGE, ex);
         }
       }
-      // Set a watch on the znode
       if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
@@ -325,24 +342,22 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     } catch (Exception e) {
       LOG.error("Unable to create a znode for this server instance", e);
       CloseableUtils.closeQuietly(znode);
+      CloseableUtils.closeQuietly(slotZnode);
       throw (e instanceof IOException) ? (IOException)e : new IOException(e);
     }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created zknode with path: {} service record: {}", znodePath, srv);
     }
+
     return uniq.toString();
   }
 
   private void checkAndSetAcls() throws Exception {
     if (!UserGroupInformation.isSecurityEnabled()) return;
-    String pathToCheck = znodePath;
     // We are trying to check ACLs on the "workers" directory, which noone except us should be
     // able to write to. Higher-level directories shouldn't matter - we don't read them.
-    int ix = pathToCheck.lastIndexOf('/');
-    if (ix > 0) {
-      pathToCheck = pathToCheck.substring(0, ix);
-    }
+    String pathToCheck = workersPath;
     List<ACL> acls = zooKeeperClient.getACL().forPath(pathToCheck);
     if (acls == null || acls.isEmpty()) {
       // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all.
@@ -506,38 +521,61 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     }
 
     @Override
-    public Map<String, ServiceInstance> getAll() {
-      Map<String, ServiceInstance> instances = new LinkedHashMap<>();
+    public Collection<ServiceInstance> getAll() {
+      List<ServiceInstance> instances = new ArrayList<>();
       // TODO: we could refresh instanceCache here on previous failure
       for (ChildData childData : instancesCache.getCurrentData()) {
-        if (childData != null) {
-          byte[] data = childData.getData();
-          if (data != null) {
-            try {
-              ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
-              ServiceInstance instance = new DynamicServiceInstance(srv);
-              instances.put(childData.getPath(), instance);
-            } catch (IOException e) {
-              LOG.error("Unable to decode data for zkpath: {}." +
-                  " Ignoring from current instances list..", childData.getPath());
-            }
-          }
+        if (childData == null) continue;
+        byte[] data = childData.getData();
+        if (data == null) continue;
+        if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
+        try {
+          ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+          ServiceInstance instance = new DynamicServiceInstance(srv);
+          instances.add(instance);
+        } catch (IOException e) {
+          LOG.error("Unable to decode data for zkpath: {}." +
+              " Ignoring from current instances list..", childData.getPath());
         }
       }
       return instances;
     }
 
     @Override
-    public List<ServiceInstance> getAllInstancesOrdered() {
-      List<ServiceInstance> list = new LinkedList<>();
-      list.addAll(instances.getAll().values());
-      Collections.sort(list, new Comparator<ServiceInstance>() {
-        @Override
-        public int compare(ServiceInstance o1, ServiceInstance o2) {
-          return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
+    public Collection<ServiceInstance> getAllInstancesOrdered() {
+      Map<String, String> slotByWorker = new HashMap<String, String>();
+      List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>();
+      for (ChildData childData : instancesCache.getCurrentData()) {
+        if (childData == null) continue;
+        byte[] data = childData.getData();
+        if (data == null) continue;
+        String nodeName = extractNodeName(childData);
+        if (nodeName.startsWith(WORKER_PREFIX)) {
+          try {
+            ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+            ServiceInstance instance = new DynamicServiceInstance(srv);
+            unsorted.add(instance);
+          } catch (IOException e) {
+            LOG.error("Unable to decode data for zkpath: {}." +
+                " Ignoring from current instances list..", childData.getPath());
+          }
+        } else if (nodeName.startsWith(SLOT_PREFIX)) {
+          slotByWorker.put(extractWorkerIdFromSlot(childData), nodeName);
+        } else {
+          LOG.info("Ignoring unknown node {}", childData.getPath());
         }
-      });
-      return list;
+      }
+
+      TreeMap<String, ServiceInstance> sorted = new TreeMap<>();
+      for (ServiceInstance worker : unsorted) {
+        String slot = slotByWorker.get(worker.getWorkerIdentity());
+        if (slot == null) {
+          LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
+          continue;
+        }
+        sorted.put(slot, worker);
+      }
+      return sorted.values();
     }
 
     @Override
@@ -563,23 +601,22 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     public Set<ServiceInstance> getByHost(String host) {
       Set<ServiceInstance> byHost = new HashSet<>();
       for (ChildData childData : instancesCache.getCurrentData()) {
-        if (childData != null) {
-          byte[] data = childData.getData();
-          if (data != null) {
-            try {
-              ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
-              ServiceInstance instance = new DynamicServiceInstance(srv);
-              if (host.equals(instance.getHost())) {
-                byHost.add(instance);
-              }
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Locality comparing " + host + " to " + instance.getHost());
-              }
-            } catch (IOException e) {
-              LOG.error("Unable to decode data for zkpath: {}." +
-                  " Ignoring host from current instances list..", childData.getPath());
-            }
+        if (childData == null) continue;
+        byte[] data = childData.getData();
+        if (data == null) continue;
+        if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
+        try {
+          ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
+          ServiceInstance instance = new DynamicServiceInstance(srv);
+          if (host.equals(instance.getHost())) {
+            byHost.add(instance);
           }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Locality comparing " + host + " to " + instance.getHost());
+          }
+        } catch (IOException e) {
+          LOG.error("Unable to decode data for zkpath: {}." +
+              " Ignoring host from current instances list..", childData.getPath());
         }
       }
 
@@ -595,6 +632,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     }
   }
 
+  // TODO: make class static? fields leak
   private class InstanceStateChangeListener implements PathChildrenCacheListener {
     private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class);
 
@@ -605,43 +643,59 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
           && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
 
       synchronized (this) {
-        if (!stateChangeListeners.isEmpty()) {
-          ServiceInstance instance = null;
-          ChildData childData = event.getData();
-          if (childData != null) {
-            byte[] data = childData.getData();
-            if (data != null) {
-              try {
-                ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
-                instance = new DynamicServiceInstance(srv);
-              } catch (IOException e) {
-                LOG.error("Unable to decode data for zknode: {}." +
-                    " Dropping notification of type: {}", childData.getPath(), event.getType());
-              }
-            }
-          }
-
-          // notify listeners of the new data
-          for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
-            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
-              LOG.info("Added zknode {} to llap namespace. Notifying state change listener.",
-                  event.getData().getPath());
-              listener.onCreate(instance);
-            } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
-              LOG.info("Updated zknode {} in llap namespace. Notifying state change listener.",
-                  event.getData().getPath());
-              listener.onUpdate(instance);
-            } else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
-              LOG.info("Removed zknode {} from llap namespace. Notifying state change listener.",
-                  event.getData().getPath());
-              listener.onRemove(instance);
-            }
+        if (stateChangeListeners.isEmpty()) return;
+        ChildData childData = event.getData();
+        if (childData == null) return;
+        String nodeName = extractNodeName(childData);
+        if (!nodeName.startsWith(WORKER_PREFIX)) return; // No need to propagate slot updates.
+        LOG.info("{} for zknode {} in llap namespace", event.getType(), childData.getPath());
+        ServiceInstance instance = extractServiceInstance(event, childData);
+        for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
+          switch (event.getType()) {
+          case CHILD_ADDED:
+            listener.onCreate(instance);
+            break;
+          case CHILD_UPDATED:
+            listener.onUpdate(instance);
+            break;
+          case CHILD_REMOVED:
+            listener.onRemove(instance);
+            break;
+          default:
+            // Ignore all the other events; logged above.
           }
         }
       }
     }
   }
 
+  private static String extractWorkerIdFromSlot(ChildData childData) {
+    return new String(childData.getData(), SlotZnode.CHARSET);
+  }
+
+  private static String extractNodeName(ChildData childData) {
+    String nodeName = childData.getPath();
+    int ix = nodeName.lastIndexOf("/");
+    if (ix >= 0) {
+      nodeName = nodeName.substring(ix + 1);
+    }
+    return nodeName;
+  }
+
+  private ServiceInstance extractServiceInstance(
+      PathChildrenCacheEvent event, ChildData childData) {
+    byte[] data = childData.getData();
+    if (data == null) return null;
+    try {
+      ServiceRecord srv = encoder.fromBytes(event.getData().getPath(), data);
+      return new DynamicServiceInstance(srv);
+    } catch (IOException e) {
+      LOG.error("Unable to decode data for zknode: {}." +
+          " Dropping notification of type: {}", childData.getPath(), event.getType());
+      return null;
+    }
+  }
+  
   @Override
   public ServiceInstanceSet getInstances(String component) throws IOException {
     checkPathChildrenCache();
@@ -669,8 +723,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
     // lazily create PathChildrenCache
     if (instancesCache == null) {
-      this.instancesCache = new PathChildrenCache(zooKeeperClient,
-          RegistryPathUtils.parentOf(pathPrefix).toString(), true);
+      this.instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true);
       instancesCache.getListenable().addListener(new InstanceStateChangeListener(),
           Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
               .setDaemon(true)
@@ -698,6 +751,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   @Override
   public void stop() throws IOException {
     CloseableUtils.closeQuietly(znode);
+    CloseableUtils.closeQuietly(slotZnode);
     CloseableUtils.closeQuietly(instancesCache);
     CloseableUtils.closeQuietly(zooKeeperClient);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
new file mode 100644
index 0000000..362dd91
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/SlotZnode.java
@@ -0,0 +1,334 @@
+  /**
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *   http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing,
+   * software distributed under the License is distributed on an
+   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+   * KIND, either express or implied.  See the License for the
+   * specific language governing permissions and limitations
+   * under the License.
+   */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * We would have used the curator ephemeral node with some extra logic but it doesn't handle
+ * the EXISTS condition, which is crucial here; so we c/p parts of Curator and add our logic.
+ */
+public class SlotZnode implements Closeable {
+  static final Charset CHARSET = StandardCharsets.UTF_8;
+  private final static Logger LOG = LoggerFactory.getLogger(SlotZnode.class);
+
+  private final AtomicReference<CountDownLatch> initialCreateLatch =
+      new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+  private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
+  private final Random rdm = new Random();
+  private final CuratorFramework client;
+  private final String basePath, prefix, workerPrefix, dataStr;
+  private final byte[] data;
+  private int currentSlot;
+  private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+  private int fallbackCount = 0; // Test-only counter.
+  private final BackgroundCallback backgroundCallback = new BackgroundCallback() {
+    @Override
+    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+      processCreateResult(client, event);
+    }
+  };
+  private final Watcher watcher = new Watcher() {
+    @Override
+    public void process(WatchedEvent event) {
+      processWatchedEvent(event);
+    }
+  };
+  private final BackgroundCallback checkExistsCallback = new BackgroundCallback() {
+    @Override
+    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+      processWatchResult(event);
+    }
+  };
+  private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+      processConnectionState(newState);
+    }
+  };
+
+  private enum State {
+    LATENT,
+    INITIAL_SELECTION,
+    AFTER_SELECTION,
+    CLOSED
+  }
+
+  public SlotZnode(
+      CuratorFramework client, String basePath, String prefix, String workerPrefix, String data) {
+    this.client = Preconditions.checkNotNull(client, "client cannot be null");
+    this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
+    this.prefix = Preconditions.checkNotNull(prefix, "prefix cannot be null");
+    this.workerPrefix = workerPrefix;
+    Preconditions.checkNotNull(data, "data cannot be null");
+    this.dataStr = data;
+    this.data = data.getBytes(CHARSET);
+  }
+
+  @VisibleForTesting
+  public int getFallbackCount() {
+    return fallbackCount;
+  }
+
+  private void chooseSlotToTake() throws Exception {
+    int slotToTake = -1;
+    while (true) {
+      List<String> allChildNodes;
+      try {
+        allChildNodes = client.getChildren().forPath(basePath);
+      } catch (Exception e) {
+        LOG.error("Cannot list nodes to get slots; failing", e);
+        throw e;
+      }
+      TreeSet<Integer> slots = new TreeSet<>();
+      int approxWorkerCount = 0;
+      for (String child : allChildNodes) {
+        if (!child.startsWith(prefix)) {
+          if (child.startsWith(workerPrefix)) {
+            ++approxWorkerCount;
+          }
+        } else {
+          slots.add(Integer.parseInt(child.substring(prefix.length())));
+        }
+      }
+      Iterator<Integer> slotIter = slots.iterator();
+      slotToTake = 0;
+      while (slotIter.hasNext()) {
+        int nextTaken = slotIter.next();
+        if (slotToTake < nextTaken) break;
+        slotToTake = nextTaken + 1;
+      }
+      // There may be a race for this slot so re-query after a delay with some probability.
+      if (slotToTake != currentSlot || !shouldFallBackOnCollision(approxWorkerCount)) break;
+      ++fallbackCount;
+      Thread.sleep(rdm.nextInt(200)); // arbitrary
+    }
+
+    currentSlot = slotToTake;
+    LOG.info("Will attempt to take slot " + currentSlot);
+  }
+
+
+  private boolean shouldFallBackOnCollision(int approxWorkerCount) {
+    // Ideally, we'd want 1 worker to try for every slot; e.g. if there are 4 workers we want 3
+    // to re-read, i.e. probability of falling back = 0.75, or 1/4 < random([0,1)). However, we
+    // make it slightly more probable (2.0x) to avoid too much re-reading. This is hand-wavy.
+    if (approxWorkerCount == 0) return false;
+    return (2.0f / approxWorkerCount) <= rdm.nextDouble();
+  }
+
+  private String getSlotPath(int slot) {
+    return String.format("%s/%s%010d", basePath, prefix, slot);
+  }
+
+  public boolean start(long timeout, TimeUnit unit) throws Exception {
+    Preconditions.checkState(state.compareAndSet(State.LATENT, State.INITIAL_SELECTION), "Already started");
+    CountDownLatch localLatch = initialCreateLatch.get();
+    client.getConnectionStateListenable().addListener(connectionStateListener);
+    chooseSlotToTake();
+    startCreateCurrentNode();
+    return localLatch.await(timeout, unit);
+  }
+
+  @Override
+  public void close() throws IOException {
+    State currentState = state.getAndSet(State.CLOSED);
+    if (currentState == State.CLOSED || currentState == State.LATENT) return;
+    client.getConnectionStateListenable().removeListener(connectionStateListener);
+    String localNodePath = nodePath.getAndSet(null);
+    if (localNodePath == null) return;
+    try {
+      client.delete().guaranteed().forPath(localNodePath);
+    } catch (KeeperException.NoNodeException ignore) {
+    } catch (Exception e) {
+      LOG.error("Deleting node: " + localNodePath, e);
+      throw new IOException(e);
+    }
+  }
+
+  public int getCurrentSlot() {
+    assert isActive();
+    return currentSlot;
+  }
+
+  private void startCreateCurrentNode() {
+    if (!isActive()) return;
+    String createPath = null;
+    try {
+      createPath = getSlotPath(currentSlot);
+      LOG.info("Attempting to create " + createPath);
+      client.create().withMode(CreateMode.EPHEMERAL).inBackground(backgroundCallback)
+        .forPath(createPath, data);
+    } catch (Exception e) {
+      LOG.error("Creating node. Path: " + createPath, e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void watchNode() throws Exception {
+    if (!isActive()) return;
+    String localNodePath = nodePath.get();
+    if (localNodePath == null) return;
+    try {
+      client.checkExists().usingWatcher(watcher).inBackground(
+          checkExistsCallback).forPath(localNodePath);
+    } catch (Exception e) {
+      LOG.error("Watching node: " + localNodePath, e);
+      throw e;
+    }
+  }
+
+  private boolean isActive() {
+    State localState = state.get();
+    return (localState != State.LATENT && localState != State.CLOSED);
+  }
+
+  private void processWatchResult(CuratorEvent event) throws Exception {
+    if (event.getResultCode() != KeeperException.Code.NONODE.intValue()) return;
+    LOG.info("Trying to reacquire because of the NONODE event");
+    startCreateCurrentNode();
+  }
+
+
+  private void processConnectionState(ConnectionState newState) {
+    if (newState != ConnectionState.RECONNECTED) return;
+    LOG.info("Trying to reacquire because of the RECONNECTED event");
+    startCreateCurrentNode();
+  }
+
+
+  private void processWatchedEvent(WatchedEvent event) {
+    if (event.getType() != EventType.NodeDeleted) return;
+    String localPath = nodePath.get();
+    if (localPath == null) return;
+    if (!localPath.equals(event.getPath())) {
+      LOG.info("Ignoring the NodeDeleted event for " + event.getPath());
+      return;
+    }
+    LOG.info("Trying to reacquire because of the NodeDeleted event");
+    startCreateCurrentNode();
+  }
+
+
+  private void processCreateResult(CuratorFramework client, CuratorEvent event) throws Exception {
+    boolean doesExist = event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue();
+    if (!doesExist && event.getResultCode() != KeeperException.Code.OK.intValue()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Trying to reacquire due to create error: " + event);
+      }
+      startCreateCurrentNode(); // TODO: a pattern from Curator. Better error handling?
+      return;
+    }
+    State localState = state.get();
+    switch (localState) {
+    case CLOSED:
+    case LATENT:
+      return;
+    case INITIAL_SELECTION:
+      if (doesExist) {
+        LOG.info("Slot " + currentSlot + " was occupied");
+        chooseSlotToTake(); // Try another slot.
+        startCreateCurrentNode();
+      } else {
+        handleCreatedNode(event.getName());
+      }
+      break;
+    case AFTER_SELECTION:
+      if (doesExist) {
+        processExistsFromCreate(client, event.getPath());
+      } else {
+        handleCreatedNode(event.getName());
+      }
+      break;
+    default:
+      throw new AssertionError("Unknown state " + localState);
+    }
+  }
+
+
+  private void processExistsFromCreate(CuratorFramework client, String path) throws Exception {
+    byte[] actual;
+    try {
+      actual = client.getData().forPath(path);
+    } catch (Exception ex) {
+      LOG.error("Error getting data for the node; will retry creating", ex);
+      startCreateCurrentNode();
+      return;
+    }
+    if (Arrays.equals(actual, data)) {
+      handleCreatedNode(path);
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Data at {} is from a different node: {} (we are {})",
+            path, new String(actual, CHARSET), dataStr);
+      }
+      nodePath.getAndSet(null);
+      chooseSlotToTake(); // Try another slot.
+      startCreateCurrentNode();
+    }
+  }
+
+  private void handleCreatedNode(String path) throws Exception {
+    while (true) {
+      State localState = state.get();
+      if (localState == State.CLOSED || localState == State.LATENT) return;
+      if (state.compareAndSet(localState, State.AFTER_SELECTION)) break;
+    }
+    nodePath.set(path);
+    watchNode();
+    CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
+    if (localLatch != null) {
+      localLatch.countDown();
+    }
+    LOG.info("Acquired the slot znode {}{}", path,
+        localLatch != null ? "; this is the initial assignment" : "");
+  }
+
+  @VisibleForTesting
+  public String getActualPath() {
+    return nodePath.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
index 921e050..ace9475 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenClient.java
@@ -137,11 +137,11 @@ public class LlapTokenClient {
       registry.start();
       activeInstances = registry.getInstances();
     }
-    Map<String, ServiceInstance> daemons = activeInstances.getAll();
+    Collection<ServiceInstance> daemons = activeInstances.getAll();
     if (daemons == null || daemons.isEmpty()) {
       throw new RuntimeException("No LLAPs found");
     }
-    lastKnownInstances = daemons.values();
+    lastKnownInstances = daemons;
     return new ArrayList<ServiceInstance>(lastKnownInstances);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
----------------------------------------------------------------------
diff --git a/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
new file mode 100644
index 0000000..209168f
--- /dev/null
+++ b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestSlotZnode.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.DebugUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/** See SlotZNode; some tests (and the setup) are c/p-ed or modified from Curator. */
+public class TestSlotZnode {
+  private static final String DIR = "/test";
+  private static final String PATH = ZKPaths.makePath(DIR, "/foo");
+  private final Collection<CuratorFramework> curatorInstances = Lists.newArrayList();
+  private final Collection<SlotZnode> createdNodes = Lists.newArrayList();
+  private TestingServer server;
+
+  private static Logger LOG = LoggerFactory.getLogger(TestSlotZnode.class);
+
+  @Before
+  public void setUp() throws Exception {
+    System.setProperty(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
+    while (server == null) {
+      try {
+        server = new TestingServer();
+      } catch (BindException e) {
+        LOG.warn("Getting bind exception - retrying to allocate server");
+        server = null;
+      }
+    }
+  }
+
+  @After
+  public void teardown() throws Exception {
+    for (SlotZnode node : createdNodes) {
+      node.close();
+    }
+    for (CuratorFramework curator : curatorInstances) {
+      curator.close();
+    }
+    server.close();
+    server = null;
+  }
+
+  @Test
+  public void testDeletesNodeWhenClosed() throws Exception {
+    CuratorFramework curator = newCurator();
+    SlotZnode node = createZnode(curator);
+    assertTrue(node.start(5, TimeUnit.SECONDS));
+    String path = null;
+    try {
+      path = node.getActualPath();
+      assertNodeExists(curator, path);
+    } finally {
+      node.close(); // After closing the path is set to null...
+    }
+    assertNodeDoesNotExist(curator, path);
+  }
+
+  @Test
+  public void testDeletedAndRecreatedNodeWhenSessionReconnects() throws Exception {
+    CuratorFramework curator = newCurator();
+    CuratorFramework observer = newCurator();
+    SlotZnode node = createZnode(curator);
+    assertTrue(node.start(5, TimeUnit.SECONDS));
+    String originalPath = node.getActualPath();
+    assertNodeExists(observer, originalPath);
+    Trigger deletedTrigger = Trigger.deleted();
+    observer.checkExists().usingWatcher(deletedTrigger).forPath(node.getActualPath());
+    killSession(curator);
+    // Make sure the node got deleted.
+    assertTrue(deletedTrigger.firedWithin(5, TimeUnit.SECONDS));
+    // Check for it to be recreated.
+    Trigger createdTrigger = Trigger.created();
+    Stat stat = observer.checkExists().usingWatcher(createdTrigger).forPath(originalPath);
+    assertTrue(stat != null || createdTrigger.firedWithin(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testRecreatesNodeWhenItGetsDeleted() throws Exception {
+    CuratorFramework curator = newCurator();
+    SlotZnode node = createZnode(curator);
+    assertTrue(node.start(5, TimeUnit.SECONDS));
+    String originalNode = node.getActualPath();
+    assertNodeExists(curator, originalNode);
+    // Delete the original node...
+    curator.delete().forPath(originalNode);
+    Trigger createdWatchTrigger = Trigger.created();
+    Stat stat = curator.checkExists().usingWatcher(createdWatchTrigger).forPath(originalNode);
+    assertTrue(stat != null || createdWatchTrigger.firedWithin(5, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testPathUsage() throws Exception {
+    CuratorFramework curator = newCurator();
+    SlotZnode node1 = createZnode(curator),
+        node2 = createZnode(curator), node3 = createZnode(curator);
+    assertTrue(node1.start(5, TimeUnit.SECONDS));
+    String path1 = node1.getActualPath();
+    assertTrue(node2.start(5, TimeUnit.SECONDS));
+    String path2 = node2.getActualPath();
+    assertFalse(path1.equals(path2));
+    node1.close();
+    // Path must be reused.
+    assertTrue(node3.start(5, TimeUnit.SECONDS));
+    assertTrue(path1.equals(node3.getActualPath()));
+  }
+
+  private void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+    cdlIn.countDown();
+    try {
+      cdlOut.await();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testConcurrencyAndFallback() throws Exception {
+    concurrencyTest(100, true);
+  }
+
+  @Test
+  public void testConcurrencyNoFallback() throws Exception {
+    concurrencyTest(100, false);
+  }
+
+  private void concurrencyTest(final int nodeCount, boolean isFallback) throws Exception {
+    final CuratorFramework curator = newCurator();
+    ExecutorService executor = Executors.newFixedThreadPool(nodeCount);
+    final CountDownLatch cdlIn = new CountDownLatch(nodeCount), cdlOut = new CountDownLatch(1);
+    @SuppressWarnings("unchecked")
+    FutureTask<SlotZnode>[] startTasks = new FutureTask[nodeCount];
+    for (int i = 0; i < nodeCount; ++i) {
+      if (isFallback) {
+        curator.create().creatingParentsIfNeeded().forPath(PATH + "/worker-" + i);
+      }
+      startTasks[i] = new FutureTask<SlotZnode>(new Callable<SlotZnode>() {
+        SlotZnode node = createZnode(curator);
+        public SlotZnode call() throws Exception {
+          syncThreadStart(cdlIn, cdlOut);
+          return node.start(5, TimeUnit.SECONDS) ? node : null;
+        }
+      });
+    }
+    for (int i = 0; i < startTasks.length; ++i) {
+      executor.execute(startTasks[i]);
+    }
+    cdlIn.await();
+    cdlOut.countDown();
+    boolean[] found = new boolean[nodeCount];
+    int totalFallbackCount = 0;
+    for (int i = 0; i < startTasks.length; ++i) {
+      SlotZnode node = startTasks[i].get();
+      assertNotNull(node);
+      totalFallbackCount += node.getFallbackCount();
+      int slot = node.getCurrentSlot();
+      assertTrue(slot < found.length);
+      assertFalse(found[slot]); // Given these 2 lines we don't need to double check later.
+      found[slot] = true;
+    }
+    if (isFallback) {
+      LOG.info("Total fallback count " + totalFallbackCount);
+      assertTrue(totalFallbackCount > 0);
+    }
+  }
+
+  private CuratorFramework newCurator() throws IOException {
+    CuratorFramework client = CuratorFrameworkFactory.newClient(
+        server.getConnectString(), 10000, 10000, new RetryOneTime(1));
+    client.start();
+    curatorInstances.add(client);
+    return client;
+  }
+
+  private SlotZnode createZnode(CuratorFramework curator) throws Exception {
+    if (curator.checkExists().forPath(PATH) == null) {
+      curator.create().creatingParentsIfNeeded().forPath(PATH);
+    }
+    SlotZnode result = new SlotZnode(curator, PATH, "slot-", "worker-", "");
+    createdNodes.add(result);
+    return result;
+  }
+
+  private void assertNodeExists(CuratorFramework curator, String path) throws Exception {
+    assertNotNull(path);
+    assertTrue(curator.checkExists().forPath(path) != null);
+  }
+
+  private void assertNodeDoesNotExist(CuratorFramework curator, String path) throws Exception {
+    assertTrue(curator.checkExists().forPath(path) == null);
+  }
+
+  public void killSession(CuratorFramework curator) throws Exception {
+    KillSession.kill(curator.getZookeeperClient().getZooKeeper(), curator.getZookeeperClient().getCurrentConnectionString());
+  }
+
+  private static final class Trigger implements Watcher {
+    private final Event.EventType type;
+    private final CountDownLatch latch;
+    public Trigger(Event.EventType type) {
+      assertNotNull(type);
+      this.type = type;
+      this.latch = new CountDownLatch(1);
+    }
+    @Override
+    public void process(WatchedEvent event) {
+      if (type == event.getType()) {
+        latch.countDown();
+      }
+    }
+    public boolean firedWithin(long duration, TimeUnit unit) {
+      try {
+        return latch.await(duration, unit);
+      } catch (InterruptedException e) {
+        throw Throwables.propagate(e);
+      }
+    }
+    private static Trigger created() {
+      return new Trigger(Event.EventType.NodeCreated);
+    }
+    private static Trigger deleted() {
+      return new Trigger(Event.EventType.NodeDeleted);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
index 17ce69b..0efe545 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -484,34 +485,30 @@ public class LlapStatusServiceDriver {
           "Failed to create llap registry client", e);
     }
     try {
-      Map<String, ServiceInstance> serviceInstanceMap;
+      Collection<ServiceInstance> serviceInstances;
       try {
-        serviceInstanceMap = llapRegistry.getInstances().getAll();
+        serviceInstances = llapRegistry.getInstances().getAll();
       } catch (IOException e) {
         throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e);
       }
 
-      if (serviceInstanceMap == null || serviceInstanceMap.isEmpty()) {
+      if (serviceInstances == null || serviceInstances.isEmpty()) {
         LOG.info("No information found in the LLAP registry");
         appStatusBuilder.setLiveInstances(0);
         appStatusBuilder.setState(State.LAUNCHING);
         appStatusBuilder.clearLlapInstances();
         return ExitCode.SUCCESS;
       } else {
-
-
         // Tracks instances known by both slider and llap.
         List<LlapInstance> validatedInstances = new LinkedList<>();
         List<String> llapExtraInstances = new LinkedList<>();
 
-        for (Map.Entry<String, ServiceInstance> serviceInstanceEntry : serviceInstanceMap
-            .entrySet()) {
-
-          ServiceInstance serviceInstance = serviceInstanceEntry.getValue();
-          String containerIdString = serviceInstance.getProperties().get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
-
+        for (ServiceInstance serviceInstance : serviceInstances) {
+          String containerIdString = serviceInstance.getProperties().get(
+              HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
 
-          LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(containerIdString);
+          LlapInstance llapInstance = appStatusBuilder.removeAndgetLlapInstanceForContainer(
+              containerIdString);
           if (llapInstance != null) {
             llapInstance.setMgmtPort(serviceInstance.getManagementPort());
             llapInstance.setRpcPort(serviceInstance.getRpcPort());

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index efd774d..10d9ad1 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -313,7 +313,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       registry.start();
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
+      for (ServiceInstance inst : activeInstances.getAll()) {
         addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
             metrics));
       }
@@ -326,21 +326,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class);
 
     @Override
-    public void onCreate(final ServiceInstance serviceInstance) {
+    public void onCreate(ServiceInstance serviceInstance) {
       addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
           numSchedulableTasksPerNode, metrics));
       LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
     @Override
-    public void onUpdate(final ServiceInstance serviceInstance) {
+    public void onUpdate(ServiceInstance serviceInstance) {
       instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
           nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
       LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
     @Override
-    public void onRemove(final ServiceInstance serviceInstance) {
+    public void onRemove(ServiceInstance serviceInstance) {
       // FIXME: disabling this for now
       // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
       LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
@@ -444,7 +444,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     readLock.lock();
     try {
       int numInstancesFound = 0;
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
+      for (ServiceInstance inst : activeInstances.getAll()) {
         if (inst.isAlive()) {
           Resource r = inst.getResource();
           memory += r.getMemory();
@@ -494,7 +494,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     readLock.lock();
     try {
       int n = 0;
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
+      for (ServiceInstance inst : activeInstances.getAll()) {
         if (inst.isAlive()) {
           n++;
         }
@@ -811,7 +811,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     /* check again whether nodes are disabled or just missing */
     writeLock.lock();
     try {
-      for (ServiceInstance inst : activeInstances.getAll().values()) {
+      for (ServiceInstance inst : activeInstances.getAll()) {
         if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
           /* that's a good node, not added to the allocations yet */
           LOG.info("Found a new node: " + inst + ".");
@@ -825,7 +825,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   }
 
   private void addNode(ServiceInstance inst, NodeInfo node) {
-    LOG.info("Adding node: " + inst);
     // we have just added a new node. Signal timeout monitor to reset timer
     if (activeInstances.size() == 1) {
       LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 8a4fc08..2e9918e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,7 @@ public class Utils {
       LlapRegistryService serviceRegistry;
       serviceRegistry = LlapRegistryService.getClient(conf);
 
-      List<ServiceInstance> serviceInstances =
+      Collection<ServiceInstance> serviceInstances =
           serviceRegistry.getInstances().getAllInstancesOrdered();
       String[] locations = new String[serviceInstances.size()];
       int i = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/9365bec5/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 54f7363..7ed3df1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -131,9 +131,9 @@ public class TestHostAffinitySplitLocationProvider {
           newMsg = newLoc + " splits went to the new node";
       LOG.info(movedMsg + " and " + newMsg + msgTail);
       double maxMoved = 1.0f * splits.length / locs, minNew = 1.0f * splits.length / locs;
-      movedRatioSum += Math.max(moved / maxMoved, 1f);
+      movedRatioSum += moved / maxMoved;
       movedRatioWorst = Math.max(moved / maxMoved, movedRatioWorst);
-      newRatioSum += Math.min(newLoc / minNew, 1f);
+      newRatioSum += newLoc / minNew;
       newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
       logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew);
     }