You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2020/02/26 23:16:17 UTC
[hive] branch master updated: HIVE-22927: LLAP should filter tasks
in HB,
instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by
Prasanth Jayachandran)
This is an automated email from the ASF dual-hosted git repository.
prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cfc12f0 HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
cfc12f0 is described below
commit cfc12f05f0c034f9aad149960e58d40902e0dcfe
Author: Rajesh Balamohan <rb...@cloudera.com>
AuthorDate: Wed Feb 26 15:14:52 2020 -0800
HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
---
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 57 +++++++++-------------
1 file changed, 24 insertions(+), 33 deletions(-)
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index e5dc378..b168f76 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -762,46 +762,37 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
- HashMap<TezTaskAttemptID, Boolean> attempts = new HashMap<>();
- for (int i = 0; i < tasks.get().length; ++i) {
- boolean isGuaranteed = false;
- if (guaranteed != null) {
- isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
- }
- attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed);
- }
- String error = "";
+ Set<TezTaskAttemptID> error = new HashSet<>();
synchronized (biMap) {
- for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
- // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
- // or at least (in this case) track the latest unique ID for LlapNode and retry all
- // older-node tasks proactively. For now let the heartbeats fail them.
- TezTaskAttemptID attemptId = entry.getValue();
- String taskNodeId = entityTracker.getUniqueNodeId(attemptId);
- // Unique ID is registered based on Submit response. Theoretically, we could get a ping
- // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null.
- // However, the next heartbeat(s) should get the value eventually and mark task as alive.
- // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
- if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
- Boolean isGuaranteed = attempts.get(attemptId);
- if (isGuaranteed != null) {
- getContext().taskAlive(attemptId);
- scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue());
+ for (int i = 0; i < tasks.get().length; ++i) {
+ boolean isGuaranteed = false;
+ if (guaranteed != null) {
+ isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get();
+ }
+ TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i];
+
+ // Check if the taskAttempt is present in AM view
+ if (biMap.containsValue(attemptID)) {
+ String taskNodeId = entityTracker.getUniqueNodeId(attemptID);
+ if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
+ getContext().taskAlive(attemptID);
+ scheduler.taskInfoUpdated(attemptID, isGuaranteed);
+ getContext().containerAlive(biMap.inverse().get(attemptID));
} else {
- error += (attemptId + ", ");
+ error.add(attemptID);
}
- getContext().containerAlive(entry.getKey());
}
}
+ }
- if (!error.isEmpty()) {
- LOG.info("The tasks we expected to be on the node are not there: " + error);
- for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
- LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " +
- "registered with different unique ID", entry.getValue());
- getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED,
+ if (!error.isEmpty()) {
+ LOG.info("The tasks we expected to be on the node are not there: " + error);
+ for (TezTaskAttemptID attempt: error) {
+ LOG.info("Sending a kill for attempt {}, due to a ping from "
+ + "node with same host and same port but " +
+ "registered with different unique ID", attempt);
+ getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED,
"Node with same host and port but with new unique ID pinged");
- }
}
}
} else {