You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/27 20:24:19 UTC

[17/20] hive git commit: HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)

HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f

Branch: refs/heads/hive-14535
Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f
Parents: 7f1c29e
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Feb 27 11:54:45 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Feb 27 11:54:45 2017 -0800

----------------------------------------------------------------------
 .../tezplugins/LlapTaskSchedulerService.java    | 110 ++++++----
 .../TestLlapTaskSchedulerService.java           | 199 +++++++++++++++++--
 2 files changed, 260 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 97191f8..fe73ff1 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
@@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private final LlapTaskSchedulerMetrics metrics;
   private final JvmPauseMonitor pauseMonitor;
+  private final Random random = new Random();
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
     this(taskSchedulerContext, new MonotonicClock(), true);
@@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  @VisibleForTesting
+  public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) {
+    this.activeInstances = serviceInstanceSet;
+  }
+
   private class NodeStateChangeListener implements ServiceInstanceStateChangeListener {
     private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class);
 
@@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
       /* fall through - miss in locality or no locality-requested */
       Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true);
-      ArrayList<NodeInfo> allNodes = new ArrayList<>(instances.size());
+      List<NodeInfo> allNodes = new ArrayList<>(instances.size());
+      List<NodeInfo> activeNodesWithFreeSlots = new ArrayList<>();
       for (ServiceInstance inst : instances) {
-        NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-        if (nodeInfo != null) {
-          allNodes.add(nodeInfo);
+        if (inst instanceof InactiveServiceInstance) {
+          allNodes.add(null);
+        } else {
+          NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+          if (nodeInfo == null) {
+            allNodes.add(null);
+          } else {
+            allNodes.add(nodeInfo);
+            if (nodeInfo.canAcceptTask()) {
+              activeNodesWithFreeSlots.add(nodeInfo);
+            }
+          }
         }
       }
 
+      if (allNodes.isEmpty()) {
+        return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+      }
+
+      // no locality-requested, randomly pick a node containing free slots
       if (requestedHosts == null || requestedHosts.length == 0) {
-        // no locality-requested, iterate the available hosts in consistent order from the beginning
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task);
-        }
-        for (NodeInfo nodeInfo : allNodes) {
-          if (nodeInfo.canAcceptTask()) {
-            LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}",
-                nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0)
-                    ? "null" : requestedHostsDebugStr));
-            return new SelectHostResult(nodeInfo);
-          }
-        }
-      } else {
-        // miss in locality request, try the next available host that can accept tasks (assume the consistent instances
-        // list as a ring) from the index of first requested host
-        final String firstRequestedHost = requestedHosts[0];
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " +
-            "task={}", firstRequestedHost, request.task);
+          LOG.debug("No-locality requested. Selecting a random host for task={}", request.task);
         }
-        int requestedHostIdx = -1;
-        for (int i = 0; i < allNodes.size(); i++) {
-          if (allNodes.get(i).getHost().equals(firstRequestedHost)) {
+        return randomSelection(activeNodesWithFreeSlots);
+      }
+
+      // miss in locality request, try picking consistent location with fallback to random selection
+      final String firstRequestedHost = requestedHosts[0];
+      int requestedHostIdx = -1;
+      for (int i = 0; i < allNodes.size(); i++) {
+        NodeInfo nodeInfo = allNodes.get(i);
+        if (nodeInfo != null) {
+          if (nodeInfo.getHost().equals(firstRequestedHost)){
             requestedHostIdx = i;
             break;
           }
         }
+      }
 
-        for (int i = 0; i < allNodes.size(); i++) {
-          NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
-          if (nodeInfo.canAcceptTask()) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Assigning {} when looking for first requested host, from #hosts={},"
-                      + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(),
-                  ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
-                      requestedHostsDebugStr));
-            }
-            return new SelectHostResult(nodeInfo);
+      // requested host died or unknown host requested, fallback to random selection.
+      // TODO: At this point we don't know the slot number of the requested host, so can't rollover to next available
+      if (requestedHostIdx == -1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Requested node [{}] in consistent order does not exist. Falling back to random selection for " +
+            "request {}", firstRequestedHost, request);
+        }
+        return randomSelection(activeNodesWithFreeSlots);
+      }
+
+      // requested host is still alive but cannot accept task, pick the next available host in consistent order
+      for (int i = 0; i < allNodes.size(); i++) {
+        NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
+        // next node in consistent order died or does not have free slots, rollover to next
+        if (nodeInfo == null || !nodeInfo.canAcceptTask()) {
+          continue;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Assigning {} in consistent order when looking for first requested host, from #hosts={},"
+                + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(),
+              ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+                requestedHostsDebugStr));
           }
+          return new SelectHostResult(nodeInfo);
         }
       }
 
@@ -861,6 +888,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private SelectHostResult randomSelection(final List<NodeInfo> nodesWithFreeSlots) {
+    if (nodesWithFreeSlots.isEmpty()) {
+      return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+    }
+
+    NodeInfo randomNode = nodesWithFreeSlots.get(random.nextInt(nodesWithFreeSlots.size()));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts=null",
+        randomNode.toShortString(), nodesWithFreeSlots.size());
+    }
+    return new SelectHostResult(randomNode);
+  }
+
   private void addNode(NodeInfo node, ServiceInstance serviceInstance) {
     // we have just added a new node. Signal timeout monitor to reset timer
     if (activeInstances.size() == 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index d60635b..339f513 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +40,9 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
@@ -58,6 +63,8 @@ import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableSet;
+
 public class TestLlapTaskSchedulerService {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class);
@@ -65,6 +72,7 @@ public class TestLlapTaskSchedulerService {
   private static final String HOST1 = "host1";
   private static final String HOST2 = "host2";
   private static final String HOST3 = "host3";
+  private static final String HOST4 = "host4";
 
   @Test(timeout = 10000)
   public void testSimpleLocalAllocation() throws IOException, InterruptedException {
@@ -456,8 +464,8 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test(timeout = 10000000)
-  public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException {
+  @Test(timeout = 10000)
+  public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);
 
     String[] hostsKnown = new String[]{HOST1, HOST2};
@@ -476,10 +484,72 @@ public class TestLlapTaskSchedulerService {
       Object task3 = "task3";
       Object clientCookie3 = "cookie3";
 
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie4";
+
       tsWrapper.controlScheduler(true);
-      tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
+      tsWrapper.allocateTask(task1, hostsKnown, priority1, clientCookie1);
       tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
-      tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3);
+      tsWrapper.allocateTask(task3, hostsUnknown, priority1, clientCookie3);
+      tsWrapper.allocateTask(task4, noHosts, priority1, clientCookie4);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 4) {
+          break;
+        }
+      }
+
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(4))
+        .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+      assertEquals(4, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      assertEquals(task3, argumentCaptor.getAllValues().get(2));
+      assertEquals(task4, argumentCaptor.getAllValues().get(3));
+      // 1st task requested host1, got host1
+      assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+      // 2nd task requested host1, got host1
+      assertEquals(HOST1, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+      // 3rd task requested unknown host, got host2 since host1 is full and only host2 is left in random pool
+      assertEquals(HOST2, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+      // 4rd task provided no location preference, got host2 since host1 is full and only host2 is left in random pool
+      assertEquals(HOST2, argumentCaptor2.getAllValues().get(3).getNodeId().getHost());
+
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testHostPreferenceMissesConsistentRollover() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+
+    String[] hostsKnown = new String[]{HOST1, HOST2, HOST3};
+    String[] hostsLive = new String[]{HOST1, HOST2, HOST3};
+    String[] hostsH2 = new String[]{HOST2};
+    TestTaskSchedulerServiceWrapper tsWrapper =
+      new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
+      tsWrapper.allocateTask(task3, hostsH2, priority1, clientCookie3);
 
       while (true) {
         tsWrapper.signalSchedulerRun();
@@ -495,18 +565,79 @@ public class TestLlapTaskSchedulerService {
         .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
       assertEquals(3, argumentCaptor.getAllValues().size());
       assertEquals(task1, argumentCaptor.getAllValues().get(0));
-      // 1st task provided unknown host location, it should be assigned first host
-      assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
       assertEquals(task2, argumentCaptor.getAllValues().get(1));
-      // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity
-      assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
       assertEquals(task3, argumentCaptor.getAllValues().get(2));
-      // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2
-      assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+      // 1st task requested host2, got host2
+      assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+      // 2nd task requested host2, got host3 as host2 is full
+      assertEquals(HOST3, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+      // 3rd task requested host2, got host1 as host2 and host3 are full
+      assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
 
-      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true);
+
+      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
-      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations);
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testHostPreferenceMissesConsistentPartialAlive() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+
+    String[] hostsKnown = new String[]{HOST1, HOST2, HOST3, HOST4};
+    String[] hostsLive = new String[]{HOST1, HOST2, null, HOST4}; // host3 dead before scheduling
+    String[] hostsH2 = new String[]{HOST2};
+    String[] hostsH3 = new String[]{HOST3};
+    TestTaskSchedulerServiceWrapper tsWrapper =
+      new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
+      tsWrapper.allocateTask(task3, hostsH3, priority1, clientCookie3);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+          break;
+        }
+      }
+
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(3))
+        .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+      assertEquals(3, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      assertEquals(task3, argumentCaptor.getAllValues().get(2));
+
+      // 1st task requested host2, got host2
+      assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+      // 2nd task requested host2, got host4 since host3 is dead and host2 is full
+      assertEquals(HOST4, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+      // 3rd task requested host3, got host1 since host3 is dead and host4 is full
+      assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+
+      verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true);
+
+      assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations);
     } finally {
       tsWrapper.shutdown();
     }
@@ -1316,6 +1447,7 @@ public class TestLlapTaskSchedulerService {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
     TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
+    ServiceInstanceSet mockServiceInstanceSet = mock(ServiceInstanceSet.class);
     ControlledClock clock = new ControlledClock(new MonotonicClock());
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
@@ -1344,12 +1476,20 @@ public class TestLlapTaskSchedulerService {
     TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
                                     int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
         IOException, InterruptedException {
+      this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, controlledDelayedTaskQueue,
+        hosts, false);
+    }
+
+    TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+      int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue, String[] liveHosts,
+      boolean useMockRegistry) throws
+      IOException, InterruptedException {
       conf = new Configuration();
       conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
       conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
       conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
       conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
-          nodeDisableTimeoutMillis + "ms");
+        nodeDisableTimeoutMillis + "ms");
       conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
       conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
 
@@ -1358,15 +1498,46 @@ public class TestLlapTaskSchedulerService {
       UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
       doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
 
+      if (useMockRegistry) {
+        List<ServiceInstance> liveInstances = new ArrayList<>();
+        for (String host : liveHosts) {
+          if (host == null) {
+            ServiceInstance mockInactive = mock(InactiveServiceInstance.class);
+            doReturn(host).when(mockInactive).getHost();
+            doReturn("inactive-host-" + host).when(mockInactive).getWorkerIdentity();
+            doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host);
+            liveInstances.add(mockInactive);
+          } else {
+            ServiceInstance mockActive = mock(ServiceInstance.class);
+            doReturn(host).when(mockActive).getHost();
+            doReturn("host-" + host).when(mockActive).getWorkerIdentity();
+            doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host);
+            liveInstances.add(mockActive);
+          }
+        }
+        doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true);
+
+        List<ServiceInstance> allInstances = new ArrayList<>();
+        for (String host : hosts) {
+          ServiceInstance mockActive = mock(ServiceInstance.class);
+          doReturn(host).when(mockActive).getHost();
+          doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource();
+          doReturn("host-" + host).when(mockActive).getWorkerIdentity();
+          allInstances.add(mockActive);
+        }
+        doReturn(allInstances).when(mockServiceInstanceSet).getAll();
+      }
       if (controlledDelayedTaskQueue) {
         ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
       } else {
         ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
       }
-
       controlScheduler(true);
       ts.initialize();
       ts.start();
+      if (useMockRegistry) {
+        ts.setServiceInstanceSet(mockServiceInstanceSet);
+      }
       // One scheduler pass from the nodes that are added at startup
       signalSchedulerRun();
       controlScheduler(false);