You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/23 09:14:33 UTC
hive git commit: HIVE-15487: LLAP: Improvements to random selection
while scheduling
Repository: hive
Updated Branches:
refs/heads/master 858ce8c22 -> 7befe8e67
HIVE-15487: LLAP: Improvements to random selection while scheduling
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7befe8e6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7befe8e6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7befe8e6
Branch: refs/heads/master
Commit: 7befe8e67a91142270667c9e3b727271231d9c9b
Parents: 858ce8c
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Dec 23 01:14:18 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Dec 23 01:14:18 2016 -0800
----------------------------------------------------------------------
.../tezplugins/LlapTaskSchedulerService.java | 65 +++++++++++++++-----
.../TestLlapTaskSchedulerService.java | 56 +++++++++++++++++
2 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7befe8e6/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 7838bef..b1240aa 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -782,27 +782,60 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
}
- /* fall through - miss in locality (random scheduling) or no locality-requested */
- Collection<ServiceInstance> instances = activeInstances.getAll();
- ArrayList<NodeInfo> all = new ArrayList<>(instances.size());
+
+ /* fall through - miss in locality or no locality-requested */
+ Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true);
+ ArrayList<NodeInfo> allNodes = new ArrayList<>(instances.size());
for (ServiceInstance inst : instances) {
NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
- if (nodeInfo != null && nodeInfo.canAcceptTask()) {
- all.add(nodeInfo);
+ if (nodeInfo != null) {
+ allNodes.add(nodeInfo);
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Attempting random allocation for task={}", request.task);
- }
- if (all.isEmpty()) {
- return SELECT_HOST_RESULT_DELAYED_RESOURCES;
- }
- NodeInfo randomNode = all.get(random.nextInt(all.size()));
- LOG.info("Assigning " + randomNode.toShortString()
- + " when looking for any host, from #hosts=" + all.size() + ", requestedHosts="
- + ((requestedHosts == null || requestedHosts.length == 0)
+
+ if (requestedHosts == null || requestedHosts.length == 0) {
+ // no locality-requested, iterate the available hosts in consistent order from the beginning
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task);
+ }
+ for (NodeInfo nodeInfo : allNodes) {
+ if (nodeInfo.canAcceptTask()) {
+ LOG.info("Assigning " + nodeInfo.toShortString()
+ + " when looking for any host, from #hosts=" + allNodes.size() + ", requestedHosts="
+ + ((requestedHosts == null || requestedHosts.length == 0)
? "null" : Arrays.toString(requestedHosts)));
- return new SelectHostResult(randomNode);
+ return new SelectHostResult(nodeInfo);
+ }
+ }
+ } else {
+ // miss in locality request, try the next available host that can accept tasks (assume the consistent instances
+ // list as a ring) from the index of first requested host
+ final String firstRequestedHost = requestedHosts[0];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " +
+ "task={}", firstRequestedHost, request.task);
+ }
+ int requestedHostIdx = -1;
+ for (int i = 0; i < allNodes.size(); i++) {
+ if (allNodes.get(i).getHost().equals(firstRequestedHost)) {
+ requestedHostIdx = i;
+ break;
+ }
+ }
+
+ for (int i = 0; i < allNodes.size(); i++) {
+ NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
+ if (nodeInfo.canAcceptTask()) {
+ LOG.info("Assigning " + nodeInfo.toShortString()
+ + " when looking for first requested host, from #hosts=" + allNodes.size() + ", requestedHosts="
+ + ((requestedHosts == null || requestedHosts.length == 0)
+ ? "null" : Arrays.toString(requestedHosts)));
+ return new SelectHostResult(nodeInfo);
+ }
+ }
+ }
+
+ return SELECT_HOST_RESULT_DELAYED_RESOURCES;
} finally {
readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7befe8e6/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 85d2bcd..d60635b 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -456,6 +456,62 @@ public class TestLlapTaskSchedulerService {
}
}
+ @Test(timeout = 10000000)
+ public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+
+ String[] hostsKnown = new String[]{HOST1, HOST2};
+ String[] hostsUnknown = new String[]{HOST3};
+ String[] noHosts = new String[]{};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
+ try {
+ Object task1 = "task1";
+ Object clientCookie1 = "cookie1";
+
+ Object task2 = "task2";
+ Object clientCookie2 = "cookie2";
+
+ Object task3 = "task3";
+ Object clientCookie3 = "cookie3";
+
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
+ tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
+ tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3);
+
+ while (true) {
+ tsWrapper.signalSchedulerRun();
+ tsWrapper.awaitSchedulerRun();
+ if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+ break;
+ }
+ }
+
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(3))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+ assertEquals(3, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ // 1st task provided unknown host location, it should be assigned first host
+ assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+ // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity
+ assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+ assertEquals(task3, argumentCaptor.getAllValues().get(2));
+ // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2
+ assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+ assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
@Test(timeout = 10000)
public void testForcedLocalityPreemption() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);