You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/02 09:44:09 UTC

hive git commit: HIVE-15242: LLAP: Act on Node update notifications from registry, fix isAlive checks (Siddharth Seth, Sergey Shelukhin reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master d53aa79ca -> 4e0754681


HIVE-15242: LLAP: Act on Node update notifications from registry, fix isAlive checks (Siddharth Seth, Sergey Shelukhin reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4e075468
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4e075468
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4e075468

Branch: refs/heads/master
Commit: 4e0754681cbf4d0f72ddc958e1999b1408ff9684
Parents: d53aa79
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Dec 2 01:43:54 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Dec 2 01:43:54 2016 -0800

----------------------------------------------------------------------
 .../hive/llap/registry/ServiceInstance.java     |   7 -
 .../hive/llap/registry/ServiceInstanceSet.java  |   5 +
 .../registry/impl/InactiveServiceInstance.java  |   5 -
 .../registry/impl/LlapFixedRegistryImpl.java    |   5 -
 .../impl/LlapZookeeperRegistryImpl.java         |  22 +-
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |   4 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 335 +++++++++----------
 .../TestLlapTaskSchedulerService.java           |   2 +-
 8 files changed, 176 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index 9004d3c..081995c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -66,13 +66,6 @@ public interface ServiceInstance {
    */
   public int getOutputFormatPort();
 
-  /**
-   * Return the last known state (without refreshing)
-   * 
-   * @return
-   */
-
-  public boolean isAlive();
 
   /**
    * Config properties of the Service Instance (llap.daemon.*)

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 1e8c895..0544038 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -16,6 +16,11 @@ package org.apache.hadoop.hive.llap.registry;
 import java.util.Collection;
 import java.util.Set;
 
+/**
+ * Note: For most of the implementations, there's no guarantee that the ServiceInstance returned by
+ * one invocation is the same as the instance returned by another invocation. e.g. the ZK registry
+ * returns a new ServiceInstance object each time a getInstance call is made.
+ */
 public interface ServiceInstanceSet {
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
index 79b7d51..9f2f3b4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -31,11 +31,6 @@ public class InactiveServiceInstance implements ServiceInstance {
   }
 
   @Override
-  public boolean isAlive() {
-    return false;
-  }
-
-  @Override
   public String getHost() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index bbfcbf6..10ff82e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -184,11 +184,6 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     }
 
     @Override
-    public boolean isAlive() {
-      return true;
-    }
-
-    @Override
     public Map<String, String> getProperties() {
       Map<String, String> properties = new HashMap<>(srv);
       // no worker identity

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 59f7c9e..525aadb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -22,12 +22,9 @@ import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -60,7 +57,6 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
@@ -404,7 +400,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   private class DynamicServiceInstance implements ServiceInstance {
 
     private final ServiceRecord srv;
-    private boolean alive = true;
     private final String host;
     private final int rpcPort;
     private final int mngPort;
@@ -470,17 +465,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     }
 
     @Override
-    public boolean isAlive() {
-      return alive;
-    }
-
-    public void kill() {
-      // May be possible to generate a notification back to the scheduler from here.
-      LOG.info("Killing service instance: " + this);
-      this.alive = false;
-    }
-
-    @Override
     public Map<String, String> getProperties() {
       return srv.attributes();
     }
@@ -494,7 +478,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
     @Override
     public String toString() {
-      return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort +
+      return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
           " with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
           ", servicesAddress=" + getServicesAddress() +  ", mgmtPort=" + getManagementPort() + "]";
     }
@@ -509,8 +493,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       return outputFormatPort;
     }
 
-    // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
-    // of an already known instance.
+    // TODO: This needs a hashCode/equality implementation if used as a key in various structures.
+    // A new ServiceInstance is created each time.
   }
 
   private class DynamicServiceInstanceSet implements ServiceInstanceSet {

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 288a8eb..15a81da 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -279,9 +279,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
 
     // Get the first live service instance
     for (ServiceInstance serviceInstance : serviceInstances) {
-      if (serviceInstance.isAlive()) {
-        return serviceInstance;
-      }
+      return serviceInstance;
     }
 
     LOG.info("No live service instances were found");

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3f0dde5..158772b 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -18,6 +18,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -70,6 +71,8 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Clock;
@@ -82,6 +85,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -316,8 +320,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
       for (ServiceInstance inst : activeInstances.getAll()) {
-        addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
-            metrics));
+        addNode(new NodeInfo(inst, nodeBlacklistConf, clock,
+            numSchedulableTasksPerNode, metrics), inst);
       }
     } finally {
       writeLock.unlock();
@@ -329,22 +333,31 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     @Override
     public void onCreate(ServiceInstance serviceInstance) {
-      addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
-          numSchedulableTasksPerNode, metrics));
-      LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity());
+      LOG.info("Added node with identity: {} as a result of registry callback",
+          serviceInstance.getWorkerIdentity());
+      addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
+          numSchedulableTasksPerNode, metrics), serviceInstance);
     }
 
     @Override
     public void onUpdate(ServiceInstance serviceInstance) {
-      instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
-          nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
-      LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
+      // TODO In what situations will this be invoked?
+      LOG.warn(
+          "Not expecing Updates from the registry. Received update for instance={}. Ignoring",
+          serviceInstance);
+//     Replacing NodeInfo means we end up discarding whatever state was known about that node.
+//     instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
+//          nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
+//
+//
+//    LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
     }
 
     @Override
     public void onRemove(ServiceInstance serviceInstance) {
-      // FIXME: disabling this for now
-      // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
+      NodeReport nodeReport = constructNodeReport(serviceInstance, false);
+      getContext().nodesUpdated(Collections.singletonList(nodeReport));
+      instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
       LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
       if (metrics != null) {
         metrics.setClusterNodeCount(activeInstances.size());
@@ -447,12 +460,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     try {
       int numInstancesFound = 0;
       for (ServiceInstance inst : activeInstances.getAll()) {
-        if (inst.isAlive()) {
-          Resource r = inst.getResource();
-          memory += r.getMemory();
-          vcores += r.getVirtualCores();
-          numInstancesFound++;
-        }
+        Resource r = inst.getResource();
+        memory += r.getMemory();
+        vcores += r.getVirtualCores();
+        numInstancesFound++;
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, totalVcores={}",
@@ -475,19 +486,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // need a state store eventually for current state & measure backoffs
     int memory = 0;
     int vcores = 0;
+
     readLock.lock();
     try {
-      for (Entry<String, NodeInfo> entry : instanceToNodeMap.entrySet()) {
-        if (entry.getValue().getServiceInstance().isAlive() && !entry.getValue().isDisabled()) {
-          Resource r = entry.getValue().getServiceInstance().getResource();
+      int numInstancesFound = 0;
+      for (ServiceInstance inst : activeInstances.getAll()) {
+        NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+        if (nodeInfo != null && !nodeInfo.isDisabled()) {
+          Resource r = inst.getResource();
           memory += r.getMemory();
           vcores += r.getVirtualCores();
+          numInstancesFound++;
         }
       }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GetAvailableResources: numInstancesFound={}, totalMem={}, totalVcores={}",
+            numInstancesFound, memory, vcores);
+      }
     } finally {
       readLock.unlock();
     }
-
     return Resource.newInstance(memory, vcores);
   }
 
@@ -495,13 +513,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   public int getClusterNodeCount() {
     readLock.lock();
     try {
-      int n = 0;
-      for (ServiceInstance inst : activeInstances.getAll()) {
-        if (inst.isAlive()) {
-          n++;
-        }
-      }
-      return n;
+      return activeInstances.getAll().size();
     } finally {
       readLock.unlock();
     }
@@ -516,17 +528,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       metrics.incrCompletedDagCount();
     }
     dagStats = new StatsPerDag();
+    // TODO Cleanup pending tasks etc, so that the next dag is not affected.
   }
 
   @Override
   public void blacklistNode(NodeId nodeId) {
     LOG.info("BlacklistNode not supported");
+    // TODO Disable blacklisting in Tez when using LLAP, until this is properly supported.
+    // Blacklisting can cause containers to move to a terminating state, which can cause attempt to be marked as failed.
+    // This becomes problematic when we set #allowedFailures to 0
     // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
   }
 
   @Override
   public void unblacklistNode(NodeId nodeId) {
     LOG.info("unBlacklistNode not supported");
+    // TODO: See comments under blacklistNode.
   }
 
   @Override
@@ -598,26 +615,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         }
         return false;
       }
-      ServiceInstance assignedInstance = taskInfo.assignedInstance;
-      assert assignedInstance != null;
-
-      NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance.getWorkerIdentity());
+      NodeInfo nodeInfo = taskInfo.assignedNode;
       assert nodeInfo != null;
 
-
       LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", taskInfo.task,
           taskInfo.getState(), endReason);
       // Re-enable the node if preempted
       if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
         LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
-        unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
+        unregisterPendingPreemption(taskInfo.assignedNode.getHost());
         nodeInfo.registerUnsuccessfulTaskEnd(true);
         if (nodeInfo.isDisabled()) {
-          // Re-enable the node. If a task succeeded, a slot may have become available.
-          // Also reset commFailures since a task was able to communicate back and indicate success.
-          nodeInfo.enableNode();
-          // Re-insert into the queue to force the poll thread to remove the element.
-          reinsertNodeInfo(nodeInfo);
+          // Re-enable the node, if a task completed due to preemption. Capacity has become available,
+          // and we may have been able to communicate with the node.
+          queueNodeForReEnablement(nodeInfo);
         }
         // In case of success, trigger a scheduling run for pending tasks.
         trySchedulingPendingTasks();
@@ -631,9 +642,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           if (nodeInfo.isDisabled()) {
             // Re-enable the node. If a task succeeded, a slot may have become available.
             // Also reset commFailures since a task was able to communicate back and indicate success.
-            nodeInfo.enableNode();
-            // Re-insert into the queue to force the poll thread to remove the element.
-            reinsertNodeInfo(nodeInfo);
+            queueNodeForReEnablement(nodeInfo);
           }
           // In case of success, trigger a scheduling run for pending tasks.
           trySchedulingPendingTasks();
@@ -644,21 +653,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
               .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
               .contains(endReason)) {
             if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
-              dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
+              dagStats.registerCommFailure(taskInfo.assignedNode.getHost());
             } else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) {
-              dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+              dagStats.registerTaskRejected(taskInfo.assignedNode.getHost());
             }
           }
           if (endReason != null && endReason == TaskAttemptEndReason.NODE_FAILED) {
             LOG.info(
-                "Task {} ended on {} nodeInfo.toString() with a NODE_FAILED message." +
-                    " An message should come in from the registry to disable this node unless" +
+                "Task {} ended on {} with a NODE_FAILED message." +
+                    " A message should come in from the registry to disable this node unless" +
                     " this was a temporary communication failure",
-                task, assignedInstance);
+                task, nodeInfo.toShortString());
           }
           boolean commFailure =
               endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR;
-          disableInstance(assignedInstance, commFailure);
+          disableNode(nodeInfo, commFailure);
         }
       }
     } finally {
@@ -668,15 +677,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return true;
   }
 
-  private void reinsertNodeInfo(final NodeInfo nodeInfo) {
-    if ( disabledNodesQueue.remove(nodeInfo)) {
-      disabledNodesQueue.add(nodeInfo);
-    }
-    if (metrics != null) {
-      metrics.setDisabledNodeCount(disabledNodesQueue.size());
-    }
-  }
-
   @Override
   public Object deallocateContainer(ContainerId containerId) {
     LOG.debug("Ignoring deallocateContainer for containerId: " + containerId);
@@ -730,11 +730,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                 if  (nodeInfo.canAcceptTask()) {
                   // Successfully scheduled.
                   LOG.info(
-                      "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+                      "Assigning " + nodeInfo.toShortString() + " when looking for " + host +
                           ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
                           (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
                               ""));
-                  return new SelectHostResult(inst, nodeInfo);
+                  return new SelectHostResult(nodeInfo);
                 } else {
                   // The node cannot accept a task at the moment.
                   if (shouldDelayForLocality) {
@@ -755,8 +755,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                 }
               } else {
                 LOG.warn(
-                    "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
-                    inst.getWorkerIdentity(), host);
+                    "Null NodeInfo when attempting to get host with worker {}, and host {}",
+                    inst, host);
                 // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
                 // else ends up allocating to a random host immediately.
               }
@@ -798,56 +798,46 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         return SELECT_HOST_RESULT_DELAYED_RESOURCES;
       }
       NodeInfo randomNode = all.get(random.nextInt(all.size()));
-      LOG.info("Assigning " + nodeToString(randomNode.getServiceInstance(), randomNode)
+      LOG.info("Assigning " + randomNode.toShortString()
           + " when looking for any host, from #hosts=" + all.size() + ", requestedHosts="
           + ((requestedHosts == null || requestedHosts.length == 0)
               ? "null" : Arrays.toString(requestedHosts)));
-      return new SelectHostResult(randomNode.getServiceInstance(), randomNode);
+      return new SelectHostResult(randomNode);
     } finally {
       readLock.unlock();
     }
   }
 
-  private void scanForNodeChanges() {
-    /* check again whether nodes are disabled or just missing */
-    writeLock.lock();
-    try {
-      for (ServiceInstance inst : activeInstances.getAll()) {
-        if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
-          /* that's a good node, not added to the allocations yet */
-          LOG.info("Found a new node: " + inst + ".");
-          addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
-              metrics));
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void addNode(ServiceInstance inst, NodeInfo node) {
+  private void addNode(NodeInfo node, ServiceInstance serviceInstance) {
     // we have just added a new node. Signal timeout monitor to reset timer
     if (activeInstances.size() == 1) {
       LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");
       stopTimeoutMonitor();
     }
-    instanceToNodeMap.put(inst.getWorkerIdentity(), node);
+
+    NodeReport nodeReport = constructNodeReport(serviceInstance, true);
+    getContext().nodesUpdated(Collections.singletonList(nodeReport));
+
+    instanceToNodeMap.put(node.getNodeIdentity(), node);
     if (metrics != null) {
       metrics.setClusterNodeCount(activeInstances.size());
     }
     // Trigger scheduling since a new node became available.
+    LOG.info("Adding new node: {}", node);
     trySchedulingPendingTasks();
   }
 
   private void reenableDisabledNode(NodeInfo nodeInfo) {
     writeLock.lock();
     try {
-      LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance());
-      if (nodeInfo.getServiceInstance().isAlive()) {
+      LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString());
+      if (activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) {
         nodeInfo.enableNode();
       } else {
         if (LOG.isInfoEnabled()) {
-          LOG.info("Removing dead node " + nodeInfo);
+          LOG.info(
+              "Not re-enabling node: {}, since it is not present in the RegistryActiveNodeList",
+              nodeInfo.toShortString());
         }
       }
     } finally {
@@ -855,13 +845,32 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private void disableInstance(ServiceInstance instance, boolean isCommFailure) {
+  /**
+   * Updates relevant structures on the node, and fixes the position in the disabledNodeQueue
+   * to facilitate the actual re-enablement of the node.
+   * @param nodeInfo  the node to be re-enabled
+   */
+  private void queueNodeForReEnablement(final NodeInfo nodeInfo) {
+    nodeInfo.enableNode();
+    if ( disabledNodesQueue.remove(nodeInfo)) {
+      disabledNodesQueue.add(nodeInfo);
+    }
+    if (metrics != null) {
+      metrics.setDisabledNodeCount(disabledNodesQueue.size());
+    }
+  }
+
+  private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) {
     writeLock.lock();
     try {
-      NodeInfo nodeInfo = instanceToNodeMap.get(instance.getWorkerIdentity());
       if (nodeInfo == null || nodeInfo.isDisabled()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything.");
+          if (nodeInfo != null) {
+            LOG.debug("Node: " + nodeInfo.toShortString() +
+                " already disabled, or invalid. Not doing anything.");
+          } else {
+            LOG.debug("Ignoring disableNode invocation for null NodeInfo");
+          }
         }
       } else {
         nodeInfo.disableNode(isCommFailure);
@@ -879,6 +888,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private static NodeReport constructNodeReport(ServiceInstance serviceInstance,
+                                         boolean healthy) {
+    NodeReport nodeReport = NodeReport.newInstance(NodeId
+            .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()),
+        healthy ? NodeState.RUNNING : NodeState.LOST,
+        serviceInstance.getServicesAddress(), null, null,
+        null, 0, "", 0l);
+    return nodeReport;
+  }
+
   private void addPendingTask(TaskInfo taskInfo) {
     writeLock.lock();
     try {
@@ -1124,21 +1143,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private ScheduleResult scheduleTask(TaskInfo taskInfo) {
     SelectHostResult selectHostResult = selectHost(taskInfo);
     if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
-      NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair;
+      NodeInfo nodeInfo = selectHostResult.nodeInfo;
       Container container =
           containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
-              nsPair.getServiceInstance().getHost(),
-              nsPair.getServiceInstance().getRpcPort(),
-              nsPair.getServiceInstance().getServicesAddress());
+              nodeInfo.getHost(),
+              nodeInfo.getRpcPort(),
+              nodeInfo.getServiceAddress());
       writeLock.lock(); // While updating local structures
       try {
-        LOG.info("Assigned task {} to container {} on node={}", taskInfo, container.getId(),
-            nsPair.getServiceInstance());
+        LOG.info("Assigned task={} on node={}, to container={} on node={}",
+            taskInfo, nodeInfo.toShortString(), container.getId());
         dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
-            nsPair.getServiceInstance().getHost());
-        taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime());
+            nodeInfo.getHost());
+        taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
         registerRunningTask(taskInfo);
-        nsPair.getNodeInfo().registerTaskScheduled();
+        nodeInfo.registerTaskScheduled();
       } finally {
         writeLock.unlock();
       }
@@ -1169,7 +1188,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
           while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) {
             TaskInfo taskInfo = taskInfoIterator.next();
-            if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) {
+            if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) {
               // Candidate for preemption.
               preemptedCount++;
               LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
@@ -1178,9 +1197,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
               if (preemptedTaskList == null) {
                 preemptedTaskList = new LinkedList<>();
               }
-              dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
+              dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
               preemptedTaskList.add(taskInfo);
-              registerPendingPreemption(taskInfo.assignedInstance.getHost());
+              registerPendingPreemption(taskInfo.assignedNode.getHost());
               // Remove from the runningTaskList
               taskInfoIterator.remove();
             }
@@ -1256,14 +1275,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
-    return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
-        serviceInstance.getWorkerIdentity() + ", webAddress=" +
-        serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
-  }
-
-
-
   // ------ Inner classes defined after this point ------
 
   @VisibleForTesting
@@ -1322,37 +1333,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private class NodeEnablerCallable implements Callable<Void> {
 
     private final AtomicBoolean isShutdown = new AtomicBoolean(false);
-    private static final long REFRESH_INTERVAL = 10000l;
-    long nextPollInterval = REFRESH_INTERVAL;
-    long lastRefreshTime;
+    private static final long POLL_TIMEOUT = 10000L;
 
     @Override
     public Void call() {
 
-      lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
-          while (true) {
-            NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
-            if (nodeInfo != null) {
-              long currentTime =  LlapTaskSchedulerService.this.clock.getTime();
-              // A node became available. Enable the node and try scheduling.
-              reenableDisabledNode(nodeInfo);
-              trySchedulingPendingTasks();
-
-              nextPollInterval -= (currentTime - lastRefreshTime);
-            }
-
-            if (nextPollInterval < 0 || nodeInfo == null) {
-              // timeout expired. Reset the poll interval and refresh nodes.
-              nextPollInterval = REFRESH_INTERVAL;
-              lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
-              // TODO Get rid of this polling once we have notificaitons from the registry sub-system
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Refreshing instances based on poll interval");
-              }
-              scanForNodeChanges();
-            }
+          NodeInfo nodeInfo =
+              disabledNodesQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+          if (nodeInfo != null) {
+            // A node became available. Enable the node and try scheduling.
+            reenableDisabledNode(nodeInfo);
+            trySchedulingPendingTasks();
           }
         } catch (InterruptedException e) {
           if (isShutdown.get()) {
@@ -1393,8 +1386,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
             "No LLAP Daemons are running", getContext().getCurrentDagInfo());
       } catch (Exception e) {
+        DagInfo currentDagInfo = getContext().getCurrentDagInfo();
         LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}",
-            getContext().getCurrentDagInfo().getName(), e);
+            currentDagInfo == null ? "" : currentDagInfo.getName(), e);
       }
     }
   }
@@ -1505,8 +1499,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     }
 
-    ServiceInstance getServiceInstance() {
-      return serviceInstance;
+    String getNodeIdentity() {
+      return serviceInstance.getWorkerIdentity();
+    }
+
+    String getHost() {
+      return serviceInstance.getHost();
+    }
+
+    int getRpcPort() {
+      return serviceInstance.getRpcPort();
+    }
+
+    String getServiceAddress() {
+      return serviceInstance.getServicesAddress();
     }
 
     void enableNode() {
@@ -1535,7 +1541,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         delayTime = blacklistConf.maxDelay;
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", serviceInstance,
+        LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}",
+            serviceInstance,
             delayTime, commFailure);
       }
       expireTimeMillis = currentTime + delayTime;
@@ -1577,7 +1584,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     /**
      * @return the time at which this node will be re-enabled
      */
-    public long getEnableTime() {
+    long getEnableTime() {
       return expireTimeMillis;
     }
 
@@ -1585,22 +1592,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       return disabled;
     }
 
-    public boolean hadCommFailure() {
+    boolean hadCommFailure() {
       return hadCommFailure;
     }
 
     /* Returning true does not guarantee that the task will run, considering other queries
     may be running in the system. Also depends upon the capacity usage configuration
      */
-    public boolean canAcceptTask() {
-      boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
+    boolean canAcceptTask() {
+      boolean result = !hadCommFailure && !disabled
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
       if (LOG.isInfoEnabled()) {
         LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
                 serviceInstance.getWorkerIdentity() + "]: " +
-                "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
-            result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
-            serviceInstance.isAlive());
+                "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
+            result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);
       }
       return result;
     }
@@ -1634,6 +1640,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           + ", commFailures=" + hadCommFailure
           +'}';
     }
+
+    private String toShortString() {
+      return "{" + serviceInstance.getHost() + ":" +
+          serviceInstance.getRpcPort() + ", id=" + getNodeIdentity() +
+          ", stc=" + numSchedulableTasks + "}";
+    }
+
   }
 
   @VisibleForTesting
@@ -1699,6 +1712,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       _registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
     }
 
+    // TODO Track stats of rejections etc per host
     void registerTaskPreempted(String host) {
       numPreemptedTasks++;
     }
@@ -1753,7 +1767,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     long startTime;
     long preemptTime;
     ContainerId containerId;
-    ServiceInstance assignedInstance;
+    NodeInfo assignedNode;
     private State state = State.PENDING;
     boolean inDelayedQueue = false;
 
@@ -1782,8 +1796,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       this.uniqueId = ID_GEN.getAndIncrement();
     }
 
-    synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
-      this.assignedInstance = instance;
+    synchronized void setAssignmentInfo(NodeInfo nodeInfo, ContainerId containerId, long startTime) {
+      this.assignedNode = nodeInfo;
       this.containerId = containerId;
       this.startTime = startTime;
       this.state = State.ASSIGNED;
@@ -1859,7 +1873,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           ", priority=" + priority +
           ", startTime=" + startTime +
           ", containerId=" + containerId +
-          ", assignedInstance=" + assignedInstance +
+          ", assignedNode=" + (assignedNode == null ? "" : assignedNode.toShortString()) +
           ", uniqueId=" + uniqueId +
           ", localityDelayTimeout=" + localityDelayTimeout +
           '}';
@@ -1907,16 +1921,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   }
 
   private static class SelectHostResult {
-    final NodeServiceInstancePair nodeServiceInstancePair;
+    final NodeInfo nodeInfo;
     final ScheduleResult scheduleResult;
 
-    SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
-      this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo);
+    SelectHostResult(NodeInfo nodeInfo) {
+      this.nodeInfo = nodeInfo;
       this.scheduleResult = ScheduleResult.SCHEDULED;
     }
 
     SelectHostResult(ScheduleResult scheduleResult) {
-      this.nodeServiceInstancePair = null;
+      this.nodeInfo = null;
       this.scheduleResult = scheduleResult;
     }
   }
@@ -1928,24 +1942,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES =
       new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);
 
-  private static class NodeServiceInstancePair {
-    final NodeInfo nodeInfo;
-    final ServiceInstance serviceInstance;
-
-    private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
-      this.serviceInstance = serviceInstance;
-      this.nodeInfo = nodeInfo;
-    }
-
-    public ServiceInstance getServiceInstance() {
-      return serviceInstance;
-    }
-
-    public NodeInfo getNodeInfo() {
-      return nodeInfo;
-    }
-  }
-
   private static final class NodeBlacklistConf {
     private final long minDelay;
     private final long maxDelay;
@@ -1986,4 +1982,5 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           '}';
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 402658b..85d2bcd 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -210,7 +210,7 @@ public class TestLlapTaskSchedulerService {
       assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
       LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek();
       assertNotNull(disabledNodeInfo);
-      assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost());
+      assertEquals(HOST1, disabledNodeInfo.getHost());
       assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS));
       assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);