You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/05 03:03:19 UTC

hive git commit: Revert "HIVE-18326 : LLAP Tez scheduler - only preempt tasks if there's a dependency between them (Sergey Shelukhin, reviewed by Eric Wohlstadter, Jason Dere)"

Repository: hive
Updated Branches:
  refs/heads/master 96a409e1c -> 20c9a3905


Revert "HIVE-18326 : LLAP Tez scheduler - only preempt tasks if there's a dependency between them (Sergey Shelukhin, reviewed by Eric Wohlstadter, Jason Dere)"

This reverts commit 3f5148d6aae94f2ae9db2aacccb302211834c699.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20c9a390
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20c9a390
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20c9a390

Branch: refs/heads/master
Commit: 20c9a3905f4b1b627c935ad54a53a7a59015587c
Parents: 96a409e
Author: sergey <se...@apache.org>
Authored: Thu Jan 4 19:03:04 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Jan 4 19:03:04 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 -
 .../tezplugins/LlapTaskSchedulerService.java    | 165 ++++---------------
 2 files changed, 35 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6529da6..1dc7501 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3400,9 +3400,6 @@ public class HiveConf extends Configuration {
       "Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" +
       "start at the min timeout and go up to the max timeout based on this backoff factor.",
       "llap.task.scheduler.node.disable.backoff.factor"),
-    LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT("hive.llap.task.scheduler.preempt.independent", false,
-      "Whether the AM LLAP scheduler should preempt a lower priority task for a higher pri one\n" +
-      "even if the former doesn't depend on the latter (e.g. for two parallel sides of a union)."),
     LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE(
       "hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0,
       "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 14bb85a..e97a267 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,15 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,17 +75,13 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.OperationCallback;
 import org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl;
-import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
 import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
 import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
-import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -92,12 +97,8 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -108,9 +109,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -249,11 +248,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private LlapTaskCommunicator communicator;
   private final int amPort;
   private final String serializedToken, jobIdForToken;
-  // We expect the DAGs to not be super large, so store full dependency set for each vertex to
-  // avoid traversing the tree later. To save memory, this could be an array (of byte arrays?).
-  private final Object outputsLock = new Object();
-  private boolean isInitialized = false;
-  private Map<Integer, Set<Integer>> transitiveOutputs;
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
     this(taskSchedulerContext, new MonotonicClock(), true);
@@ -375,6 +369,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf);
     this.amRegistry = TezAmRegistryImpl.create(conf, true);
 
+
     synchronized (LlapTaskCommunicator.pluginInitLock) {
       LlapTaskCommunicator peer = LlapTaskCommunicator.instance;
       if (peer != null) {
@@ -388,84 +383,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private Map<Integer, Set<Integer>> getDependencyInfo() {
-    synchronized (outputsLock) {
-      if (isInitialized) return transitiveOutputs;
-      isInitialized = true;
-      if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT)) {
-        this.transitiveOutputs = getTransitiveVertexOutputs(getContext().getCurrentDagInfo());
-      }
-      return this.transitiveOutputs;
-    }
-  }
-
-  private static Map<Integer, Set<Integer>> getTransitiveVertexOutputs(DagInfo info) {
-    if (!(info instanceof DAG)) {
-      LOG.warn("DAG info is not a DAG - cannot derive dependencies");
-      return null;
-    }
-    DAG dag = (DAG) info;
-    int vc = dag.getVertices().size();
-    // All the vertices belong to the same DAG, so we just use numbers.
-    Map<Integer, Set<Integer>> result = Maps.newHashMapWithExpectedSize(vc);
-    LinkedList<TezVertexID> queue = new LinkedList<>();
-    // We assume a DAG is a DAG, and that it's connected. Add direct dependencies.
-    for (Vertex v : dag.getVertices().values()) {
-      Map<Vertex, Edge> out = v.getOutputVertices();
-      if (out == null) {
-        result.put(v.getVertexId().getId(), Sets.newHashSet());
-      } else {
-        Set<Integer> set = Sets.newHashSetWithExpectedSize(vc);
-        for (Vertex outV : out.keySet()) {
-          set.add(outV.getVertexId().getId());
-        }
-        result.put(v.getVertexId().getId(), set);
-      }
-      if (v.getOutputVerticesCount() == 0) {
-        queue.add(v.getVertexId());
-      }
-    }
-    Set<Integer> processed = Sets.newHashSetWithExpectedSize(vc);
-    while (!queue.isEmpty()) {
-      TezVertexID id = queue.poll();
-      if (processed.contains(id.getId())) continue; // Already processed. See backtracking.
-      Vertex v = dag.getVertex(id);
-      Map<Vertex, Edge> out = v.getOutputVertices();
-      if (out != null) {
-        // Check that all the outputs have been processed; if not, insert them into queue
-        // before the current vertex and try again. It's possible e.g. in a structure like this:
-        //   _1
-        //  / 2
-        // 3  4 where 1 may be added to the queue before 2
-        boolean doBacktrack = false;
-        for (Vertex outV : out.keySet()) {
-          TezVertexID outId = outV.getVertexId();
-          int outNum = outId.getId();
-          if (!processed.contains(outNum)) {
-            if (!doBacktrack) {
-              queue.addFirst(id);
-              doBacktrack = true;
-            }
-            queue.addFirst(outId);
-          }
-        }
-        if (doBacktrack) continue;
-      }
-      int num = id.getId();
-      processed.add(num);
-      Set<Integer> deps = result.get(num);
-      Map<Vertex, Edge> in = v.getInputVertices();
-      if (in != null) {
-        for (Vertex inV : in.keySet()) {
-          queue.add(inV.getVertexId());
-          // Our outputs are the transitive outputs of our inputs.
-          result.get(inV.getVertexId().getId()).addAll(deps);
-        }
-      }
-    }
-    return result;
-  }
-
   private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) {
     if (!UserGroupInformation.isSecurityEnabled()) return null;
     JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
@@ -1707,13 +1624,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                   break;
                 }
               }
-
               if (shouldPreempt) {
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Attempting to preempt for {} on potential hosts={}. TotalPendingPreemptions={}",
                       taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get());
                 }
-                preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, potentialHosts);
+                preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
               } else {
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption request exists",
@@ -1731,7 +1647,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                       "Attempting to preempt for task={}, priority={} on any available host",
                       taskInfo.task, taskInfo.priority);
                 }
-                preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, null);
+                preemptTasks(entry.getKey().getPriority(), 1, null);
               } else {
                 if (LOG.isDebugEnabled()) {
                   LOG.debug(
@@ -1770,10 +1686,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private static int vertexNum(TaskInfo taskInfo) {
-    return taskInfo.getAttemptId().getTaskID().getVertexID().getId(); // Sigh...
-  }
-
   private String constructPendingTaskCountsLogMessage() {
     StringBuilder sb = new StringBuilder();
     int totalCount = 0;
@@ -1861,21 +1773,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   // Removes tasks from the runningList and sends out a preempt request to the system.
   // Subsequent tasks will be scheduled again once the de-allocate request for the preempted
   // task is processed.
-  private void preemptTasks(
-      int forPriority, int forVertex, int numTasksToPreempt, String []potentialHosts) {
+  private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) {
     Set<String> preemptHosts = null;
     writeLock.lock();
     List<TaskInfo> preemptedTaskList = null;
     try {
-      // TODO: numTasksToPreempt is currently always 1.
-      preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, forVertex,
-          numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
+      preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, numTasksToPreempt,
+          potentialHosts, preemptHosts, preemptedTaskList);
       if (preemptedTaskList != null) {
         numTasksToPreempt -= preemptedTaskList.size();
       }
       if (numTasksToPreempt > 0) {
-        preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, forVertex,
-            numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
+        preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, numTasksToPreempt,
+            potentialHosts, preemptHosts, preemptedTaskList);
       }
     } finally {
       writeLock.unlock();
@@ -1894,8 +1804,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   }
 
   private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks,
-      int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts,
-      Set<String> preemptHosts, List<TaskInfo> preemptedTaskList) {
+      int forPriority, int numTasksToPreempt, String[] potentialHosts, Set<String> preemptHosts,
+      List<TaskInfo> preemptedTaskList) {
     NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap();
     Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = orderedMap.entrySet().iterator();
     int preemptedCount = 0;
@@ -1908,27 +1818,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
         while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) {
           TaskInfo taskInfo = taskInfoIterator.next();
-          if (preemptHosts != null && !preemptHosts.contains(taskInfo.assignedNode.getHost())) {
-            continue; // Not the right host.
-          }
-          Map<Integer,Set<Integer>> depInfo = getDependencyInfo();
-          if (depInfo != null && !depInfo.get(forVertex).contains(vertexNum(taskInfo))) {
-            // Only preempt if the task being preempted is "below" us in the dag.
-            continue;
-          }
-          // Candidate for preemption.
-          preemptedCount++;
-          LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
-              forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts));
-          taskInfo.setPreemptedInfo(clock.getTime());
-          if (preemptedTaskList == null) {
-            preemptedTaskList = new LinkedList<>();
+          if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) {
+            // Candidate for preemption.
+            preemptedCount++;
+            LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
+                forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts));
+            taskInfo.setPreemptedInfo(clock.getTime());
+            if (preemptedTaskList == null) {
+              preemptedTaskList = new LinkedList<>();
+            }
+            dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
+            preemptedTaskList.add(taskInfo);
+            registerPendingPreemption(taskInfo.assignedNode.getHost());
+            // Remove from the runningTaskList
+            taskInfoIterator.remove();
           }
-          dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
-          preemptedTaskList.add(taskInfo);
-          registerPendingPreemption(taskInfo.assignedNode.getHost());
-          // Remove from the runningTaskList
-          taskInfoIterator.remove();
         }
 
         // Remove entire priority level if it's been emptied.
@@ -2819,6 +2723,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       return isPendingUpdate;
     }
 
+    @VisibleForTesting
     TezTaskAttemptID getAttemptId() {
       return attemptId;
     }