You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/05 03:03:19 UTC
hive git commit: Revert "HIVE-18326 : LLAP Tez scheduler - only
preempt tasks if there's a dependency between them (Sergey Shelukhin,
reviewed by Eric Wohlstadter, Jason Dere)"
Repository: hive
Updated Branches:
refs/heads/master 96a409e1c -> 20c9a3905
Revert "HIVE-18326 : LLAP Tez scheduler - only preempt tasks if there's a dependency between them (Sergey Shelukhin, reviewed by Eric Wohlstadter, Jason Dere)"
This reverts commit 3f5148d6aae94f2ae9db2aacccb302211834c699.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20c9a390
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20c9a390
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20c9a390
Branch: refs/heads/master
Commit: 20c9a3905f4b1b627c935ad54a53a7a59015587c
Parents: 96a409e
Author: sergey <se...@apache.org>
Authored: Thu Jan 4 19:03:04 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Jan 4 19:03:04 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 -
.../tezplugins/LlapTaskSchedulerService.java | 165 ++++---------------
2 files changed, 35 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 6529da6..1dc7501 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3400,9 +3400,6 @@ public class HiveConf extends Configuration {
"Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" +
"start at the min timeout and go up to the max timeout based on this backoff factor.",
"llap.task.scheduler.node.disable.backoff.factor"),
- LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT("hive.llap.task.scheduler.preempt.independent", false,
- "Whether the AM LLAP scheduler should preempt a lower priority task for a higher pri one\n" +
- "even if the former doesn't depend on the latter (e.g. for two parallel sides of a union)."),
LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE(
"hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0,
"The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 14bb85a..e97a267 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,15 @@
package org.apache.hadoop.hive.llap.tezplugins;
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -66,17 +75,13 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.OperationCallback;
import org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl;
-import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
-import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -92,12 +97,8 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -108,9 +109,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -249,11 +248,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private LlapTaskCommunicator communicator;
private final int amPort;
private final String serializedToken, jobIdForToken;
- // We expect the DAGs to not be super large, so store full dependency set for each vertex to
- // avoid traversing the tree later. To save memory, this could be an array (of byte arrays?).
- private final Object outputsLock = new Object();
- private boolean isInitialized = false;
- private Map<Integer, Set<Integer>> transitiveOutputs;
public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
this(taskSchedulerContext, new MonotonicClock(), true);
@@ -375,6 +369,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf);
this.amRegistry = TezAmRegistryImpl.create(conf, true);
+
synchronized (LlapTaskCommunicator.pluginInitLock) {
LlapTaskCommunicator peer = LlapTaskCommunicator.instance;
if (peer != null) {
@@ -388,84 +383,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private Map<Integer, Set<Integer>> getDependencyInfo() {
- synchronized (outputsLock) {
- if (isInitialized) return transitiveOutputs;
- isInitialized = true;
- if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT)) {
- this.transitiveOutputs = getTransitiveVertexOutputs(getContext().getCurrentDagInfo());
- }
- return this.transitiveOutputs;
- }
- }
-
- private static Map<Integer, Set<Integer>> getTransitiveVertexOutputs(DagInfo info) {
- if (!(info instanceof DAG)) {
- LOG.warn("DAG info is not a DAG - cannot derive dependencies");
- return null;
- }
- DAG dag = (DAG) info;
- int vc = dag.getVertices().size();
- // All the vertices belong to the same DAG, so we just use numbers.
- Map<Integer, Set<Integer>> result = Maps.newHashMapWithExpectedSize(vc);
- LinkedList<TezVertexID> queue = new LinkedList<>();
- // We assume a DAG is a DAG, and that it's connected. Add direct dependencies.
- for (Vertex v : dag.getVertices().values()) {
- Map<Vertex, Edge> out = v.getOutputVertices();
- if (out == null) {
- result.put(v.getVertexId().getId(), Sets.newHashSet());
- } else {
- Set<Integer> set = Sets.newHashSetWithExpectedSize(vc);
- for (Vertex outV : out.keySet()) {
- set.add(outV.getVertexId().getId());
- }
- result.put(v.getVertexId().getId(), set);
- }
- if (v.getOutputVerticesCount() == 0) {
- queue.add(v.getVertexId());
- }
- }
- Set<Integer> processed = Sets.newHashSetWithExpectedSize(vc);
- while (!queue.isEmpty()) {
- TezVertexID id = queue.poll();
- if (processed.contains(id.getId())) continue; // Already processed. See backtracking.
- Vertex v = dag.getVertex(id);
- Map<Vertex, Edge> out = v.getOutputVertices();
- if (out != null) {
- // Check that all the outputs have been processed; if not, insert them into queue
- // before the current vertex and try again. It's possible e.g. in a structure like this:
- // _1
- // / 2
- // 3 4 where 1 may be added to the queue before 2
- boolean doBacktrack = false;
- for (Vertex outV : out.keySet()) {
- TezVertexID outId = outV.getVertexId();
- int outNum = outId.getId();
- if (!processed.contains(outNum)) {
- if (!doBacktrack) {
- queue.addFirst(id);
- doBacktrack = true;
- }
- queue.addFirst(outId);
- }
- }
- if (doBacktrack) continue;
- }
- int num = id.getId();
- processed.add(num);
- Set<Integer> deps = result.get(num);
- Map<Vertex, Edge> in = v.getInputVertices();
- if (in != null) {
- for (Vertex inV : in.keySet()) {
- queue.add(inV.getVertexId());
- // Our outputs are the transitive outputs of our inputs.
- result.get(inV.getVertexId().getId()).addAll(deps);
- }
- }
- }
- return result;
- }
-
private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) {
if (!UserGroupInformation.isSecurityEnabled()) return null;
JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
@@ -1707,13 +1624,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
break;
}
}
-
if (shouldPreempt) {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to preempt for {} on potential hosts={}. TotalPendingPreemptions={}",
taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get());
}
- preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, potentialHosts);
+ preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption request exists",
@@ -1731,7 +1647,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
"Attempting to preempt for task={}, priority={} on any available host",
taskInfo.task, taskInfo.priority);
}
- preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, null);
+ preemptTasks(entry.getKey().getPriority(), 1, null);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
@@ -1770,10 +1686,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private static int vertexNum(TaskInfo taskInfo) {
- return taskInfo.getAttemptId().getTaskID().getVertexID().getId(); // Sigh...
- }
-
private String constructPendingTaskCountsLogMessage() {
StringBuilder sb = new StringBuilder();
int totalCount = 0;
@@ -1861,21 +1773,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Removes tasks from the runningList and sends out a preempt request to the system.
// Subsequent tasks will be scheduled again once the de-allocate request for the preempted
// task is processed.
- private void preemptTasks(
- int forPriority, int forVertex, int numTasksToPreempt, String []potentialHosts) {
+ private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) {
Set<String> preemptHosts = null;
writeLock.lock();
List<TaskInfo> preemptedTaskList = null;
try {
- // TODO: numTasksToPreempt is currently always 1.
- preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, forVertex,
- numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
+ preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, numTasksToPreempt,
+ potentialHosts, preemptHosts, preemptedTaskList);
if (preemptedTaskList != null) {
numTasksToPreempt -= preemptedTaskList.size();
}
if (numTasksToPreempt > 0) {
- preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, forVertex,
- numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList);
+ preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, numTasksToPreempt,
+ potentialHosts, preemptHosts, preemptedTaskList);
}
} finally {
writeLock.unlock();
@@ -1894,8 +1804,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks,
- int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts,
- Set<String> preemptHosts, List<TaskInfo> preemptedTaskList) {
+ int forPriority, int numTasksToPreempt, String[] potentialHosts, Set<String> preemptHosts,
+ List<TaskInfo> preemptedTaskList) {
NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap();
Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = orderedMap.entrySet().iterator();
int preemptedCount = 0;
@@ -1908,27 +1818,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) {
TaskInfo taskInfo = taskInfoIterator.next();
- if (preemptHosts != null && !preemptHosts.contains(taskInfo.assignedNode.getHost())) {
- continue; // Not the right host.
- }
- Map<Integer,Set<Integer>> depInfo = getDependencyInfo();
- if (depInfo != null && !depInfo.get(forVertex).contains(vertexNum(taskInfo))) {
- // Only preempt if the task being preempted is "below" us in the dag.
- continue;
- }
- // Candidate for preemption.
- preemptedCount++;
- LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
- forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts));
- taskInfo.setPreemptedInfo(clock.getTime());
- if (preemptedTaskList == null) {
- preemptedTaskList = new LinkedList<>();
+ if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) {
+ // Candidate for preemption.
+ preemptedCount++;
+ LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
+ forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts));
+ taskInfo.setPreemptedInfo(clock.getTime());
+ if (preemptedTaskList == null) {
+ preemptedTaskList = new LinkedList<>();
+ }
+ dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
+ preemptedTaskList.add(taskInfo);
+ registerPendingPreemption(taskInfo.assignedNode.getHost());
+ // Remove from the runningTaskList
+ taskInfoIterator.remove();
}
- dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
- preemptedTaskList.add(taskInfo);
- registerPendingPreemption(taskInfo.assignedNode.getHost());
- // Remove from the runningTaskList
- taskInfoIterator.remove();
}
// Remove entire priority level if it's been emptied.
@@ -2819,6 +2723,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return isPendingUpdate;
}
+ @VisibleForTesting
TezTaskAttemptID getAttemptId() {
return attemptId;
}