You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 22:07:51 UTC
hive git commit: HIVE-18418 : clean up plugin between DAGs (Sergey
Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master cfd2b149a -> 69d32f589
HIVE-18418 : clean up plugin between DAGs (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/69d32f58
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/69d32f58
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/69d32f58
Branch: refs/heads/master
Commit: 69d32f58902fd055fd574d56d88a897f127deb92
Parents: cfd2b14
Author: sergey <se...@apache.org>
Authored: Wed Jan 10 14:00:44 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Jan 10 14:00:44 2018 -0800
----------------------------------------------------------------------
.../llap/tezplugins/LlapTaskSchedulerService.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/69d32f58/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 4a4915b..66de3b8 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -970,17 +970,28 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
int runningCount = 0;
+ // We don't send messages to pending tasks with the flags; they should be killed elsewhere.
for (Entry<Integer, TreeSet<TaskInfo>> entry : guaranteedTasks.entrySet()) {
- if (entry.getValue() != null) {
- runningCount += entry.getValue().size();
+ TreeSet<TaskInfo> set = speculativeTasks.get(entry.getKey());
+ if (set == null) {
+ set = new TreeSet<>();
+ speculativeTasks.put(entry.getKey(), set);
+ }
+ for (TaskInfo info : entry.getValue()) {
+ synchronized (info) {
+ info.isGuaranteed = false;
+ }
+ set.add(info);
}
}
+ guaranteedTasks.clear();
for (Entry<Integer, TreeSet<TaskInfo>> entry : speculativeTasks.entrySet()) {
if (entry.getValue() != null) {
runningCount += entry.getValue().size();
}
}
+ totalGuaranteed = unusedGuaranteed = 0;
LOG.info(
"DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}",
knownTasks.size(), pendingCount, runningCount);