You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/02/21 16:22:57 UTC

[hive] branch master updated: HIVE-22359 : LLAP: when a node restarts with the exact same host/port in kubernetes it is not detected as a task failure (Prasanth J via Gopal V)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d8ce84  HIVE-22359 : LLAP: when a node restarts with the exact same host/port in kubernetes it is not detected as a task failure (Prasanth J via Gopal V)
1d8ce84 is described below

commit 1d8ce84f5f459f17220bb81558432d81cbe956dd
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Fri Feb 21 08:22:05 2020 -0800

    HIVE-22359 : LLAP: when a node restarts with the exact same host/port in kubernetes it is not detected as a task failure (Prasanth J via Gopal V)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/llap/tezplugins/LlapTaskCommunicator.java    | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index dc10f22..e5dc378 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -724,13 +724,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
   }
 
-  public void registerPingingNode(LlapNodeId nodeId) {
+  public void registerPingingNode(LlapNodeId nodeId, String uniqueId) {
     long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
     PingingNodeInfo ni = new PingingNodeInfo(currentTs);
     PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
     if (old == null) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("Added new pinging node: [{}]", nodeId);
+        LOG.info("Added new pinging node: [{}] with uniqueId: {}", nodeId, uniqueId);
       }
     } else {
       old.pingCount.incrementAndGet();
@@ -758,7 +758,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
       TezAttemptArray tasks, BooleanArray guaranteed) {
     // TODO: do we ever need the port? we could just do away with nodeId altogether.
     LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
-    registerPingingNode(nodeId);
+    registerPingingNode(nodeId, uniqueId);
     BiMap<ContainerId, TezTaskAttemptID> biMap =
         entityTracker.getContainerAttemptMapForNode(nodeId);
     if (biMap != null) {
@@ -793,9 +793,16 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
             getContext().containerAlive(entry.getKey());
           }
         }
-      }
-      if (!error.isEmpty()) {
-        LOG.info("The tasks we expected to be on the node are not there: " + error);
+
+        if (!error.isEmpty()) {
+          LOG.info("The tasks we expected to be on the node are not there: " + error);
+          for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
+            LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " +
+                "registered with different unique ID", entry.getValue());
+            getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED,
+              "Node with same host and port but with new unique ID pinged");
+          }
+        }
       }
     } else {
       long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);