You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/23 09:14:33 UTC

hive git commit: HIVE-15487: LLAP: Improvements to random selection while scheduling

Repository: hive
Updated Branches:
  refs/heads/master 858ce8c22 -> 7befe8e67


HIVE-15487: LLAP: Improvements to random selection while scheduling


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7befe8e6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7befe8e6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7befe8e6

Branch: refs/heads/master
Commit: 7befe8e67a91142270667c9e3b727271231d9c9b
Parents: 858ce8c
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Dec 23 01:14:18 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Dec 23 01:14:18 2016 -0800

----------------------------------------------------------------------
 .../tezplugins/LlapTaskSchedulerService.java    | 65 +++++++++++++++-----
 .../TestLlapTaskSchedulerService.java           | 56 +++++++++++++++++
 2 files changed, 105 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7befe8e6/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 7838bef..b1240aa 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -782,27 +782,60 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
         }
       }
-      /* fall through - miss in locality (random scheduling) or no locality-requested */
-      Collection<ServiceInstance> instances = activeInstances.getAll();
-      ArrayList<NodeInfo> all = new ArrayList<>(instances.size());
+
+      /* fall through - miss in locality or no locality-requested */
+      Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true);
+      ArrayList<NodeInfo> allNodes = new ArrayList<>(instances.size());
       for (ServiceInstance inst : instances) {
         NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-        if (nodeInfo != null && nodeInfo.canAcceptTask()) {
-          all.add(nodeInfo);
+        if (nodeInfo != null) {
+          allNodes.add(nodeInfo);
         }
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Attempting random allocation for task={}", request.task);
-      }
-      if (all.isEmpty()) {
-        return SELECT_HOST_RESULT_DELAYED_RESOURCES;
-      }
-      NodeInfo randomNode = all.get(random.nextInt(all.size()));
-      LOG.info("Assigning " + randomNode.toShortString()
-          + " when looking for any host, from #hosts=" + all.size() + ", requestedHosts="
-          + ((requestedHosts == null || requestedHosts.length == 0)
+
+      if (requestedHosts == null || requestedHosts.length == 0) {
+        // no locality-requested, iterate the available hosts in consistent order from the beginning
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task);
+        }
+        for (NodeInfo nodeInfo : allNodes) {
+          if (nodeInfo.canAcceptTask()) {
+            LOG.info("Assigning " + nodeInfo.toShortString()
+              + " when looking for any host, from #hosts=" + allNodes.size() + ", requestedHosts="
+              + ((requestedHosts == null || requestedHosts.length == 0)
               ? "null" : Arrays.toString(requestedHosts)));
-      return new SelectHostResult(randomNode);
+            return new SelectHostResult(nodeInfo);
+          }
+        }
+      } else {
+        // miss in locality request, try the next available host that can accept tasks (assume the consistent instances
+        // list as a ring) from the index of first requested host
+        final String firstRequestedHost = requestedHosts[0];
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " +
+            "task={}", firstRequestedHost, request.task);
+        }
+        int requestedHostIdx = -1;
+        for (int i = 0; i < allNodes.size(); i++) {
+          if (allNodes.get(i).getHost().equals(firstRequestedHost)) {
+            requestedHostIdx = i;
+            break;
+          }
+        }
+
+        for (int i = 0; i < allNodes.size(); i++) {
+          NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
+          if (nodeInfo.canAcceptTask()) {
+            LOG.info("Assigning " + nodeInfo.toShortString()
+              + " when looking for first requested host, from #hosts=" + allNodes.size() + ", requestedHosts="
+              + ((requestedHosts == null || requestedHosts.length == 0)
+              ? "null" : Arrays.toString(requestedHosts)));
+            return new SelectHostResult(nodeInfo);
+          }
+        }
+      }
+
+      return SELECT_HOST_RESULT_DELAYED_RESOURCES;
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/7befe8e6/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 85d2bcd..d60635b 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -456,6 +456,62 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
+  @Test(timeout = 10000000)
+  public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+
+    String[] hostsKnown = new String[]{HOST1, HOST2};
+    String[] hostsUnknown = new String[]{HOST3};
+    String[] noHosts = new String[]{};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+      new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
+      tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+          break;
+        }
+      }
+
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(3))
+        .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture());
+      assertEquals(3, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      // 1st task provided unknown host location, it should be assigned first host
+      assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost());
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity
+      assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost());
+      assertEquals(task3, argumentCaptor.getAllValues().get(2));
+      // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2
+      assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost());
+
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
+      assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
   @Test(timeout = 10000)
   public void testForcedLocalityPreemption() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);