You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/27 20:24:19 UTC
[17/20] hive git commit: HIVE-16013: Fragments without locality can
stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)
HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f
Branch: refs/heads/hive-14535
Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f
Parents: 7f1c29e
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Mon Feb 27 11:54:45 2017 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Mon Feb 27 11:54:45 2017 -0800
----------------------------------------------------------------------
.../tezplugins/LlapTaskSchedulerService.java | 110 ++++++----
.../TestLlapTaskSchedulerService.java | 199 +++++++++++++++++--
2 files changed, 260 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 97191f8..fe73ff1 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
@@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final LlapTaskSchedulerMetrics metrics;
private final JvmPauseMonitor pauseMonitor;
+ private final Random random = new Random();
public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
this(taskSchedulerContext, new MonotonicClock(), true);
@@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ @VisibleForTesting
+ public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) {
+ this.activeInstances = serviceInstanceSet;
+ }
+
private class NodeStateChangeListener implements ServiceInstanceStateChangeListener {
private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class);
@@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends TaskScheduler {
/* fall through - miss in locality or no locality-requested */
Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true);
- ArrayList<NodeInfo> allNodes = new ArrayList<>(instances.size());
+ List<NodeInfo> allNodes = new ArrayList<>(instances.size());
+ List<NodeInfo> activeNodesWithFreeSlots = new ArrayList<>();
for (ServiceInstance inst : instances) {
- NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
- if (nodeInfo != null) {
- allNodes.add(nodeInfo);
+ if (inst instanceof InactiveServiceInstance) {
+ allNodes.add(null);
+ } else {
+ NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+ if (nodeInfo == null) {
+ allNodes.add(null);
+ } else {
+ allNodes.add(nodeInfo);
+ if (nodeInfo.canAcceptTask()) {
+ activeNodesWithFreeSlots.add(nodeInfo);
+ }
+ }
}
}
+ if (allNodes.isEmpty()) {
+ return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+ }
+
+ // no locality-requested, randomly pick a node containing free slots
if (requestedHosts == null || requestedHosts.length == 0) {
- // no locality-requested, iterate the available hosts in consistent order from the beginning
- if (LOG.isDebugEnabled()) {
- LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task);
- }
- for (NodeInfo nodeInfo : allNodes) {
- if (nodeInfo.canAcceptTask()) {
- LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}",
- nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0)
- ? "null" : requestedHostsDebugStr));
- return new SelectHostResult(nodeInfo);
- }
- }
- } else {
- // miss in locality request, try the next available host that can accept tasks (assume the consistent instances
- // list as a ring) from the index of first requested host
- final String firstRequestedHost = requestedHosts[0];
if (LOG.isDebugEnabled()) {
- LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " +
- "task={}", firstRequestedHost, request.task);
+ LOG.debug("No-locality requested. Selecting a random host for task={}", request.task);
}
- int requestedHostIdx = -1;
- for (int i = 0; i < allNodes.size(); i++) {
- if (allNodes.get(i).getHost().equals(firstRequestedHost)) {
+ return randomSelection(activeNodesWithFreeSlots);
+ }
+
+ // miss in locality request, try picking consistent location with fallback to random selection
+ final String firstRequestedHost = requestedHosts[0];
+ int requestedHostIdx = -1;
+ for (int i = 0; i < allNodes.size(); i++) {
+ NodeInfo nodeInfo = allNodes.get(i);
+ if (nodeInfo != null) {
+ if (nodeInfo.getHost().equals(firstRequestedHost)){
requestedHostIdx = i;
break;
}
}
+ }
- for (int i = 0; i < allNodes.size(); i++) {
- NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
- if (nodeInfo.canAcceptTask()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Assigning {} when looking for first requested host, from #hosts={},"
- + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(),
- ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
- requestedHostsDebugStr));
- }
- return new SelectHostResult(nodeInfo);
+ // requested host died or unknown host requested, fallback to random selection.
+ // TODO: At this point we don't know the slot number of the requested host, so can't rollover to next available
+ if (requestedHostIdx == -1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requested node [{}] in consistent order does not exist. Falling back to random selection for " +
+ "request {}", firstRequestedHost, request);
+ }
+ return randomSelection(activeNodesWithFreeSlots);
+ }
+
+ // requested host is still alive but cannot accept task, pick the next available host in consistent order
+ for (int i = 0; i < allNodes.size(); i++) {
+ NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
+ // next node in consistent order died or does not have free slots, rollover to next
+ if (nodeInfo == null || !nodeInfo.canAcceptTask()) {
+ continue;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning {} in consistent order when looking for first requested host, from #hosts={},"
+ + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(),
+ ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+ requestedHostsDebugStr));
}
+ return new SelectHostResult(nodeInfo);
}
}
@@ -861,6 +888,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ private SelectHostResult randomSelection(final List<NodeInfo> nodesWithFreeSlots) {
+ if (nodesWithFreeSlots.isEmpty()) {
+ return SELECT_HOST_RESULT_DELAYED_RESOURCES;
+ }
+
+ NodeInfo randomNode = nodesWithFreeSlots.get(random.nextInt(nodesWithFreeSlots.size()));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts=null",
+ randomNode.toShortString(), nodesWithFreeSlots.size());
+ }
+ return new SelectHostResult(randomNode);
+ }
+
private void addNode(NodeInfo node, ServiceInstance serviceInstance) {
// we have just added a new node. Signal timeout monitor to reset timer
if (activeInstances.size() == 1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index d60635b..339f513 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +40,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
@@ -58,6 +63,8 @@ import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableSet;
+
public class TestLlapTaskSchedulerService {
private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class);
@@ -65,6 +72,7 @@ public class TestLlapTaskSchedulerService {
private static final String HOST1 = "host1";
private static final String HOST2 = "host2";
private static final String HOST3 = "host3";
+ private static final String HOST4 = "host4";
@Test(timeout = 10000)
public void testSimpleLocalAllocation() throws IOException, InterruptedException {
@@ -456,8 +464,8 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test(timeout = 10000000)
- public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException {
+ @Test(timeout = 10000)
+ public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
String[] hostsKnown = new String[]{HOST1, HOST2};
@@ -476,10 +484,72 @@ public class TestLlapTaskSchedulerService {
Object task3 = "task3";
Object clientCookie3 = "cookie3";
+ Object task4 = "task4";
+ Object clientCookie4 = "cookie4";
+
tsWrapper.controlScheduler(true);
- tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
+ tsWrapper.allocateTask(task1, hostsKnown, priority1, clientCookie1);
tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
- tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3);
+ tsWrapper.allocateTask(task3, hostsUnknown, priority1, clientCookie3);
+ tsWrapper.allocateTask(task4, noHosts, priority1, clientCookie4);
+
+ while (true) {
+ tsWrapper.signalSchedulerRun();
+ tsWrapper.awaitSchedulerRun();
+ if (tsWrapper.ts.dagStats.numTotalAllocations == 4) {
+ break;
+ }
+ }
+
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(4))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+ assertEquals(4, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+ assertEquals(task3, argumentCaptor.getAllValues().get(2));
+ assertEquals(task4, argumentCaptor.getAllValues().get(3));
+ // 1st task requested host1, got host1
+ assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+ // 2nd task requested host1, got host1
+ assertEquals(HOST1, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+ // 3rd task requested unknown host, got host2 since host1 is full and only host2 is left in random pool
+ assertEquals(HOST2, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+ // 4rd task provided no location preference, got host2 since host1 is full and only host2 is left in random pool
+ assertEquals(HOST2, argumentCaptor2.getAllValues().get(3).getNodeId().getHost());
+
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+ assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testHostPreferenceMissesConsistentRollover() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+
+ String[] hostsKnown = new String[]{HOST1, HOST2, HOST3};
+ String[] hostsLive = new String[]{HOST1, HOST2, HOST3};
+ String[] hostsH2 = new String[]{HOST2};
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
+ try {
+ Object task1 = "task1";
+ Object clientCookie1 = "cookie1";
+
+ Object task2 = "task2";
+ Object clientCookie2 = "cookie2";
+
+ Object task3 = "task3";
+ Object clientCookie3 = "cookie3";
+
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
+ tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
+ tsWrapper.allocateTask(task3, hostsH2, priority1, clientCookie3);
while (true) {
tsWrapper.signalSchedulerRun();
@@ -495,18 +565,79 @@ public class TestLlapTaskSchedulerService {
.taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
assertEquals(3, argumentCaptor.getAllValues().size());
assertEquals(task1, argumentCaptor.getAllValues().get(0));
- // 1st task provided unknown host location, it should be assigned first host
- assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
assertEquals(task2, argumentCaptor.getAllValues().get(1));
- // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity
- assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
assertEquals(task3, argumentCaptor.getAllValues().get(2));
- // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2
- assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+ // 1st task requested host2, got host2
+ assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+ // 2nd task requested host2, got host3 as host2 is full
+ assertEquals(HOST3, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+ // 3rd task requested host2, got host1 as host2 and host3 are full
+ assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
- assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+ verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true);
+
+ assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
- assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testHostPreferenceMissesConsistentPartialAlive() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+
+ String[] hostsKnown = new String[]{HOST1, HOST2, HOST3, HOST4};
+ String[] hostsLive = new String[]{HOST1, HOST2, null, HOST4}; // host3 dead before scheduling
+ String[] hostsH2 = new String[]{HOST2};
+ String[] hostsH3 = new String[]{HOST3};
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true);
+ try {
+ Object task1 = "task1";
+ Object clientCookie1 = "cookie1";
+
+ Object task2 = "task2";
+ Object clientCookie2 = "cookie2";
+
+ Object task3 = "task3";
+ Object clientCookie3 = "cookie3";
+
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1);
+ tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2);
+ tsWrapper.allocateTask(task3, hostsH3, priority1, clientCookie3);
+
+ while (true) {
+ tsWrapper.signalSchedulerRun();
+ tsWrapper.awaitSchedulerRun();
+ if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+ break;
+ }
+ }
+
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(3))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+ assertEquals(3, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+ assertEquals(task3, argumentCaptor.getAllValues().get(2));
+
+ // 1st task requested host2, got host2
+ assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+ // 2nd task requested host2, got host4 since host3 is dead and host2 is full
+ assertEquals(HOST4, argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+ // 3rd task requested host3, got host1 since host3 is dead and host4 is full
+ assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+
+ verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true);
+
+ assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+ assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations);
} finally {
tsWrapper.shutdown();
}
@@ -1316,6 +1447,7 @@ public class TestLlapTaskSchedulerService {
static final Resource resource = Resource.newInstance(1024, 1);
Configuration conf;
TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
+ ServiceInstanceSet mockServiceInstanceSet = mock(ServiceInstanceSet.class);
ControlledClock clock = new ControlledClock(new MonotonicClock());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
LlapTaskSchedulerServiceForTest ts;
@@ -1344,12 +1476,20 @@ public class TestLlapTaskSchedulerService {
TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
IOException, InterruptedException {
+ this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, controlledDelayedTaskQueue,
+ hosts, false);
+ }
+
+ TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+ int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue, String[] liveHosts,
+ boolean useMockRegistry) throws
+ IOException, InterruptedException {
conf = new Configuration();
conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
- nodeDisableTimeoutMillis + "ms");
+ nodeDisableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
@@ -1358,15 +1498,46 @@ public class TestLlapTaskSchedulerService {
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
+ if (useMockRegistry) {
+ List<ServiceInstance> liveInstances = new ArrayList<>();
+ for (String host : liveHosts) {
+ if (host == null) {
+ ServiceInstance mockInactive = mock(InactiveServiceInstance.class);
+ doReturn(host).when(mockInactive).getHost();
+ doReturn("inactive-host-" + host).when(mockInactive).getWorkerIdentity();
+ doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host);
+ liveInstances.add(mockInactive);
+ } else {
+ ServiceInstance mockActive = mock(ServiceInstance.class);
+ doReturn(host).when(mockActive).getHost();
+ doReturn("host-" + host).when(mockActive).getWorkerIdentity();
+ doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host);
+ liveInstances.add(mockActive);
+ }
+ }
+ doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true);
+
+ List<ServiceInstance> allInstances = new ArrayList<>();
+ for (String host : hosts) {
+ ServiceInstance mockActive = mock(ServiceInstance.class);
+ doReturn(host).when(mockActive).getHost();
+ doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource();
+ doReturn("host-" + host).when(mockActive).getWorkerIdentity();
+ allInstances.add(mockActive);
+ }
+ doReturn(allInstances).when(mockServiceInstanceSet).getAll();
+ }
if (controlledDelayedTaskQueue) {
ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
} else {
ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
}
-
controlScheduler(true);
ts.initialize();
ts.start();
+ if (useMockRegistry) {
+ ts.setServiceInstanceSet(mockServiceInstanceSet);
+ }
// One scheduler pass from the nodes that are added at startup
signalSchedulerRun();
controlScheduler(false);