You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2020/02/26 23:16:17 UTC

[hive] branch master updated: HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)

This is an automated email from the ASF dual-hosted git repository.

prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cfc12f0  HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
cfc12f0 is described below

commit cfc12f05f0c034f9aad149960e58d40902e0dcfe
Author: Rajesh Balamohan <rb...@cloudera.com>
AuthorDate: Wed Feb 26 15:14:52 2020 -0800

    HIVE-22927: LLAP should filter tasks in HB, instead of killing all tasks on error attempts (Rajesh Balamohan reviewed by Prasanth Jayachandran)
---
 .../hive/llap/tezplugins/LlapTaskCommunicator.java | 57 +++++++++-------------
 1 file changed, 24 insertions(+), 33 deletions(-)

diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index e5dc378..b168f76 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -762,46 +762,37 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     BiMap<ContainerId, TezTaskAttemptID> biMap =
         entityTracker.getContainerAttemptMapForNode(nodeId);
     if (biMap != null) {
-      HashMap<TezTaskAttemptID, Boolean> attempts = new HashMap<>();
-      for (int i = 0; i < tasks.get().length; ++i) {
-        boolean isGuaranteed = false;
-        if (guaranteed != null) {
-          isGuaranteed = ((BooleanWritable)guaranteed.get()[i]).get();
-        }
-        attempts.put((TezTaskAttemptID)tasks.get()[i], isGuaranteed);
-      }
-      String error = "";
+      Set<TezTaskAttemptID> error = new HashSet<>();
       synchronized (biMap) {
-        for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
-          // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
-          //       or at least (in this case) track the latest unique ID for LlapNode and retry all
-          //       older-node tasks proactively. For now let the heartbeats fail them.
-          TezTaskAttemptID attemptId = entry.getValue();
-          String taskNodeId = entityTracker.getUniqueNodeId(attemptId);
-          // Unique ID is registered based on Submit response. Theoretically, we could get a ping
-          // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null.
-          // However, the next heartbeat(s) should get the value eventually and mark task as alive.
-          // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
-          if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
-            Boolean isGuaranteed = attempts.get(attemptId);
-            if (isGuaranteed != null) {
-              getContext().taskAlive(attemptId);
-              scheduler.taskInfoUpdated(attemptId, isGuaranteed.booleanValue());
+        for (int i = 0; i < tasks.get().length; ++i) {
+          boolean isGuaranteed = false;
+          if (guaranteed != null) {
+            isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get();
+          }
+          TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i];
+
+          // Check if the taskAttempt is present in AM view
+          if (biMap.containsValue(attemptID)) {
+            String taskNodeId = entityTracker.getUniqueNodeId(attemptID);
+            if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
+              getContext().taskAlive(attemptID);
+              scheduler.taskInfoUpdated(attemptID, isGuaranteed);
+              getContext().containerAlive(biMap.inverse().get(attemptID));
             } else {
-              error += (attemptId + ", ");
+              error.add(attemptID);
             }
-            getContext().containerAlive(entry.getKey());
           }
         }
+      }
 
-        if (!error.isEmpty()) {
-          LOG.info("The tasks we expected to be on the node are not there: " + error);
-          for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
-            LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " +
-                "registered with different unique ID", entry.getValue());
-            getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED,
+      if (!error.isEmpty()) {
+        LOG.info("The tasks we expected to be on the node are not there: " + error);
+        for (TezTaskAttemptID attempt: error) {
+          LOG.info("Sending a kill for attempt {}, due to a ping from "
+              + "node with same host and same port but " +
+              "registered with different unique ID", attempt);
+          getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED,
               "Node with same host and port but with new unique ID pinged");
-          }
         }
       }
     } else {