You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/07 19:42:06 UTC

hive git commit: HIVE-15296 : AM may lose task failures and not reschedule when scheduling to LLAP (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 70cc5efac -> 8804a7b89


HIVE-15296 : AM may lose task failures and not reschedule when scheduling to LLAP (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8804a7b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8804a7b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8804a7b8

Branch: refs/heads/master
Commit: 8804a7b89bf3d1d6e78d850cd524840e0bd71051
Parents: 70cc5ef
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 7 11:14:57 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 7 11:14:57 2016 -0800

----------------------------------------------------------------------
 .../ext/LlapTaskUmbilicalExternalClient.java    |  34 +-
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 323 ++++++++++++++-----
 .../org/apache/hadoop/hive/llap/DaemonId.java   |  17 +-
 .../protocol/LlapTaskUmbilicalProtocol.java     |   2 +-
 .../src/protobuf/LlapDaemonProtocol.proto       |   1 +
 .../hive/llap/daemon/impl/AMReporter.java       |  12 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   7 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |   2 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |  53 ++-
 9 files changed, 336 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4933fb3..3e0232d 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -81,6 +81,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
   private static class TaskHeartbeatInfo {
     final String taskAttemptId;
     final String hostname;
+    String uniqueNodeId;
     final int port;
     final AtomicLong lastHeartbeat = new AtomicLong();
 
@@ -161,8 +162,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
         vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
     final String fragmentId = attemptId.toString();
 
-    pendingEvents.putIfAbsent(fragmentId, new PendingEventData(
-        new TaskHeartbeatInfo(fragmentId, llapHost, llapPort), Lists.<TezEvent>newArrayList()));
+    final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort);
+    pendingEvents.putIfAbsent(
+        fragmentId, new PendingEventData(thi, Lists.<TezEvent>newArrayList()));
 
     // Setup timer task to check for hearbeat timeouts
     timer.scheduleAtFixedRate(new HeartbeatCheckTask(),
@@ -185,6 +187,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
                 return;
               }
             }
+            if (response.hasUniqueNodeId()) {
+              thi.uniqueNodeId = response.getUniqueNodeId();
+            }
           }
 
           @Override
@@ -217,26 +222,29 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
   }
 
-  private void updateHeartbeatInfo(String hostname, int port) {
+  private void updateHeartbeatInfo(String hostname, String uniqueId, int port) {
     int updateCount = 0;
 
     for (String key : pendingEvents.keySet()) {
       PendingEventData pendingEventData = pendingEvents.get(key);
       if (pendingEventData != null) {
-        if (pendingEventData.heartbeatInfo.hostname.equals(hostname)
-            && pendingEventData.heartbeatInfo.port == port) {
-          pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+        TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo;
+        String thiUniqueId = thi.uniqueNodeId;
+        if (thi.hostname.equals(hostname) && thi.port == port
+            && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
+          thi.lastHeartbeat.set(System.currentTimeMillis());
           updateCount++;
         }
       }
     }
 
     for (String key : registeredTasks.keySet()) {
-      TaskHeartbeatInfo heartbeatInfo = registeredTasks.get(key);
-      if (heartbeatInfo != null) {
-        if (heartbeatInfo.hostname.equals(hostname)
-            && heartbeatInfo.port == port) {
-          heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
+      TaskHeartbeatInfo thi = registeredTasks.get(key);
+      if (thi != null) {
+        String thiUniqueId = thi.uniqueNodeId;
+        if (thi.hostname.equals(hostname) && thi.port == port
+            && (thiUniqueId != null && thiUniqueId.equals(uniqueId))) {
+          thi.lastHeartbeat.set(System.currentTimeMillis());
           updateCount++;
         }
       }
@@ -387,8 +395,8 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService implements
     }
 
     @Override
-    public void nodeHeartbeat(Text hostname, int port) throws IOException {
-      updateHeartbeatInfo(hostname.toString(), port);
+    public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
+      updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port);
       // No need to propagate to this to the responder
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 0581681..ece31ed 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -1,5 +1,5 @@
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
-// source: src/protobuf/LlapDaemonProtocol.proto
+// source: LlapDaemonProtocol.proto
 
 package org.apache.hadoop.hive.llap.daemon.rpc;
 
@@ -11297,6 +11297,21 @@ public final class LlapDaemonProtocolProtos {
      * <code>optional .SubmissionStateProto submission_state = 1;</code>
      */
     org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState();
+
+    // optional string unique_node_id = 2;
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    boolean hasUniqueNodeId();
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    java.lang.String getUniqueNodeId();
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getUniqueNodeIdBytes();
   }
   /**
    * Protobuf type {@code SubmitWorkResponseProto}
@@ -11360,6 +11375,11 @@ public final class LlapDaemonProtocolProtos {
               }
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              uniqueNodeId_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -11416,8 +11436,52 @@ public final class LlapDaemonProtocolProtos {
       return submissionState_;
     }
 
+    // optional string unique_node_id = 2;
+    public static final int UNIQUE_NODE_ID_FIELD_NUMBER = 2;
+    private java.lang.Object uniqueNodeId_;
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    public boolean hasUniqueNodeId() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    public java.lang.String getUniqueNodeId() {
+      java.lang.Object ref = uniqueNodeId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          uniqueNodeId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string unique_node_id = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getUniqueNodeIdBytes() {
+      java.lang.Object ref = uniqueNodeId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        uniqueNodeId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+      uniqueNodeId_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -11434,6 +11498,9 @@ public final class LlapDaemonProtocolProtos {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeEnum(1, submissionState_.getNumber());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getUniqueNodeIdBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -11447,6 +11514,10 @@ public final class LlapDaemonProtocolProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeEnumSize(1, submissionState_.getNumber());
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getUniqueNodeIdBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -11475,6 +11546,11 @@ public final class LlapDaemonProtocolProtos {
         result = result &&
             (getSubmissionState() == other.getSubmissionState());
       }
+      result = result && (hasUniqueNodeId() == other.hasUniqueNodeId());
+      if (hasUniqueNodeId()) {
+        result = result && getUniqueNodeId()
+            .equals(other.getUniqueNodeId());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -11492,6 +11568,10 @@ public final class LlapDaemonProtocolProtos {
         hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER;
         hash = (53 * hash) + hashEnum(getSubmissionState());
       }
+      if (hasUniqueNodeId()) {
+        hash = (37 * hash) + UNIQUE_NODE_ID_FIELD_NUMBER;
+        hash = (53 * hash) + getUniqueNodeId().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -11603,6 +11683,8 @@ public final class LlapDaemonProtocolProtos {
         super.clear();
         submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
         bitField0_ = (bitField0_ & ~0x00000001);
+        uniqueNodeId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
 
@@ -11635,6 +11717,10 @@ public final class LlapDaemonProtocolProtos {
           to_bitField0_ |= 0x00000001;
         }
         result.submissionState_ = submissionState_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.uniqueNodeId_ = uniqueNodeId_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -11654,6 +11740,11 @@ public final class LlapDaemonProtocolProtos {
         if (other.hasSubmissionState()) {
           setSubmissionState(other.getSubmissionState());
         }
+        if (other.hasUniqueNodeId()) {
+          bitField0_ |= 0x00000002;
+          uniqueNodeId_ = other.uniqueNodeId_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11717,6 +11808,80 @@ public final class LlapDaemonProtocolProtos {
         return this;
       }
 
+      // optional string unique_node_id = 2;
+      private java.lang.Object uniqueNodeId_ = "";
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public boolean hasUniqueNodeId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public java.lang.String getUniqueNodeId() {
+        java.lang.Object ref = uniqueNodeId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          uniqueNodeId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getUniqueNodeIdBytes() {
+        java.lang.Object ref = uniqueNodeId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          uniqueNodeId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public Builder setUniqueNodeId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        uniqueNodeId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public Builder clearUniqueNodeId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        uniqueNodeId_ = getDefaultInstance().getUniqueNodeId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string unique_node_id = 2;</code>
+       */
+      public Builder setUniqueNodeIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        uniqueNodeId_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto)
     }
 
@@ -17246,83 +17411,83 @@ public final class LlapDaemonProtocolProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n%src/protobuf/LlapDaemonProtocol.proto\"" +
-      "9\n\020UserPayloadProto\022\024\n\014user_payload\030\001 \001(" +
-      "\014\022\017\n\007version\030\002 \001(\005\"j\n\025EntityDescriptorPr" +
-      "oto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014user_payload\030" +
-      "\002 \001(\0132\021.UserPayloadProto\022\024\n\014history_text" +
-      "\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025connected_verte" +
-      "x_name\030\001 \001(\t\022-\n\rio_descriptor\030\002 \001(\0132\026.En" +
-      "tityDescriptorProto\022\033\n\023physical_edge_cou" +
-      "nt\030\003 \001(\005\"z\n\023GroupInputSpecProto\022\022\n\ngroup" +
-      "_name\030\001 \001(\t\022\026\n\016group_vertices\030\002 \003(\t\0227\n\027m",
-      "erged_input_descriptor\030\003 \001(\0132\026.EntityDes" +
-      "criptorProto\"\245\003\n\022SignableVertexSpec\022\014\n\004u" +
-      "ser\030\001 \001(\t\022\026\n\016signatureKeyId\030\002 \001(\003\022/\n\020que" +
-      "ry_identifier\030\003 \001(\0132\025.QueryIdentifierPro" +
-      "to\022\025\n\rhive_query_id\030\004 \001(\t\022\020\n\010dag_name\030\005 " +
-      "\001(\t\022\023\n\013vertex_name\030\006 \001(\t\022\024\n\014vertex_index" +
-      "\030\007 \001(\005\022\030\n\020token_identifier\030\010 \001(\t\0224\n\024proc" +
-      "essor_descriptor\030\t \001(\0132\026.EntityDescripto" +
-      "rProto\022!\n\013input_specs\030\n \003(\0132\014.IOSpecProt" +
-      "o\022\"\n\014output_specs\030\013 \003(\0132\014.IOSpecProto\0221\n",
-      "\023grouped_input_specs\030\014 \003(\0132\024.GroupInputS" +
-      "pecProto\022\032\n\022vertex_parallelism\030\r \001(\005\"K\n\016" +
-      "VertexOrBinary\022#\n\006vertex\030\001 \001(\0132\023.Signabl" +
-      "eVertexSpec\022\024\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023Fr" +
-      "agmentRuntimeInfo\022#\n\033num_self_and_upstre" +
-      "am_tasks\030\001 \001(\005\022-\n%num_self_and_upstream_" +
-      "completed_tasks\030\002 \001(\005\022\033\n\023within_dag_prio" +
-      "rity\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030fi" +
-      "rst_attempt_start_time\030\005 \001(\003\022\"\n\032current_" +
-      "attempt_start_time\030\006 \001(\003\"d\n\024QueryIdentif",
-      "ierProto\022\035\n\025application_id_string\030\001 \001(\t\022" +
-      "\021\n\tdag_index\030\002 \001(\005\022\032\n\022app_attempt_number" +
-      "\030\003 \001(\005\"l\n\013NotTezEvent\022\037\n\027input_event_pro" +
-      "to_bytes\030\001 \002(\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017d" +
-      "est_input_name\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n" +
-      "\026SubmitWorkRequestProto\022\"\n\twork_spec\030\001 \001" +
-      "(\0132\017.VertexOrBinary\022\033\n\023work_spec_signatu" +
-      "re\030\002 \001(\014\022\027\n\017fragment_number\030\003 \001(\005\022\026\n\016att" +
-      "empt_number\030\004 \001(\005\022\033\n\023container_id_string" +
-      "\030\005 \001(\t\022\017\n\007am_host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005",
-      "\022\032\n\022credentials_binary\030\010 \001(\014\0223\n\025fragment" +
-      "_runtime_info\030\t \001(\0132\024.FragmentRuntimeInf" +
-      "o\022\033\n\023initial_event_bytes\030\n \001(\014\022\037\n\027initia" +
-      "l_event_signature\030\013 \001(\014\"J\n\027SubmitWorkRes" +
-      "ponseProto\022/\n\020submission_state\030\001 \001(\0162\025.S" +
-      "ubmissionStateProto\"\205\001\n\036SourceStateUpdat" +
-      "edRequestProto\022/\n\020query_identifier\030\001 \001(\013" +
-      "2\025.QueryIdentifierProto\022\020\n\010src_name\030\002 \001(" +
-      "\t\022 \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037S" +
-      "ourceStateUpdatedResponseProto\"e\n\031QueryC",
-      "ompleteRequestProto\022/\n\020query_identifier\030" +
-      "\001 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete_d" +
-      "elay\030\002 \001(\003:\0010\"\034\n\032QueryCompleteResponsePr" +
-      "oto\"t\n\035TerminateFragmentRequestProto\022/\n\020" +
-      "query_identifier\030\001 \001(\0132\025.QueryIdentifier" +
-      "Proto\022\"\n\032fragment_identifier_string\030\002 \001(" +
-      "\t\" \n\036TerminateFragmentResponseProto\"&\n\024G" +
-      "etTokenRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025G" +
-      "etTokenResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033L" +
-      "lapOutputSocketInitMessage\022\023\n\013fragment_i",
-      "d\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStatePro" +
-      "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Su" +
-      "bmissionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJE" +
-      "CTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemon" +
-      "Protocol\022?\n\nsubmitWork\022\027.SubmitWorkReque" +
-      "stProto\032\030.SubmitWorkResponseProto\022W\n\022sou" +
-      "rceStateUpdated\022\037.SourceStateUpdatedRequ" +
-      "estProto\032 .SourceStateUpdatedResponsePro" +
-      "to\022H\n\rqueryComplete\022\032.QueryCompleteReque" +
-      "stProto\032\033.QueryCompleteResponseProto\022T\n\021",
-      "terminateFragment\022\036.TerminateFragmentReq" +
-      "uestProto\032\037.TerminateFragmentResponsePro" +
-      "to2]\n\026LlapManagementProtocol\022C\n\022getDeleg" +
-      "ationToken\022\025.GetTokenRequestProto\032\026.GetT" +
-      "okenResponseProtoBH\n&org.apache.hadoop.h" +
-      "ive.llap.daemon.rpcB\030LlapDaemonProtocolP" +
-      "rotos\210\001\001\240\001\001"
+      "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" +
+      "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" +
+      "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" +
+      "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" +
+      "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" +
+      "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" +
+      "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" +
+      "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" +
+      "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
+      "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
+      "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
+      "\245\003\n\022SignableVertexSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016" +
+      "signatureKeyId\030\002 \001(\003\022/\n\020query_identifier" +
+      "\030\003 \001(\0132\025.QueryIdentifierProto\022\025\n\rhive_qu" +
+      "ery_id\030\004 \001(\t\022\020\n\010dag_name\030\005 \001(\t\022\023\n\013vertex" +
+      "_name\030\006 \001(\t\022\024\n\014vertex_index\030\007 \001(\005\022\030\n\020tok" +
+      "en_identifier\030\010 \001(\t\0224\n\024processor_descrip" +
+      "tor\030\t \001(\0132\026.EntityDescriptorProto\022!\n\013inp" +
+      "ut_specs\030\n \003(\0132\014.IOSpecProto\022\"\n\014output_s" +
+      "pecs\030\013 \003(\0132\014.IOSpecProto\0221\n\023grouped_inpu",
+      "t_specs\030\014 \003(\0132\024.GroupInputSpecProto\022\032\n\022v" +
+      "ertex_parallelism\030\r \001(\005\"K\n\016VertexOrBinar" +
+      "y\022#\n\006vertex\030\001 \001(\0132\023.SignableVertexSpec\022\024" +
+      "\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023FragmentRuntime" +
+      "Info\022#\n\033num_self_and_upstream_tasks\030\001 \001(" +
+      "\005\022-\n%num_self_and_upstream_completed_tas" +
+      "ks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001(\005\022\026\n" +
+      "\016dag_start_time\030\004 \001(\003\022 \n\030first_attempt_s" +
+      "tart_time\030\005 \001(\003\022\"\n\032current_attempt_start" +
+      "_time\030\006 \001(\003\"d\n\024QueryIdentifierProto\022\035\n\025a",
+      "pplication_id_string\030\001 \001(\t\022\021\n\tdag_index\030" +
+      "\002 \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013Not" +
+      "TezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(" +
+      "\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_nam" +
+      "e\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n\026SubmitWorkRe" +
+      "questProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOr" +
+      "Binary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017f" +
+      "ragment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004" +
+      " \001(\005\022\033\n\023container_id_string\030\005 \001(\t\022\017\n\007am_" +
+      "host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005\022\032\n\022credentia",
+      "ls_binary\030\010 \001(\014\0223\n\025fragment_runtime_info" +
+      "\030\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_" +
+      "event_bytes\030\n \001(\014\022\037\n\027initial_event_signa" +
+      "ture\030\013 \001(\014\"b\n\027SubmitWorkResponseProto\022/\n" +
+      "\020submission_state\030\001 \001(\0162\025.SubmissionStat" +
+      "eProto\022\026\n\016unique_node_id\030\002 \001(\t\"\205\001\n\036Sourc" +
+      "eStateUpdatedRequestProto\022/\n\020query_ident" +
+      "ifier\030\001 \001(\0132\025.QueryIdentifierProto\022\020\n\010sr" +
+      "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" +
+      "eProto\"!\n\037SourceStateUpdatedResponseProt",
+      "o\"e\n\031QueryCompleteRequestProto\022/\n\020query_" +
+      "identifier\030\001 \001(\0132\025.QueryIdentifierProto\022" +
+      "\027\n\014delete_delay\030\002 \001(\003:\0010\"\034\n\032QueryComplet" +
+      "eResponseProto\"t\n\035TerminateFragmentReque" +
+      "stProto\022/\n\020query_identifier\030\001 \001(\0132\025.Quer" +
+      "yIdentifierProto\022\"\n\032fragment_identifier_" +
+      "string\030\002 \001(\t\" \n\036TerminateFragmentRespons" +
+      "eProto\"&\n\024GetTokenRequestProto\022\016\n\006app_id" +
+      "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" +
+      "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n",
+      "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020Sou" +
+      "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" +
+      "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" +
+      "D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n" +
+      "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" +
+      "itWorkRequestProto\032\030.SubmitWorkResponseP" +
+      "roto\022W\n\022sourceStateUpdated\022\037.SourceState" +
+      "UpdatedRequestProto\032 .SourceStateUpdated" +
+      "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" +
+      "mpleteRequestProto\032\033.QueryCompleteRespon",
+      "seProto\022T\n\021terminateFragment\022\036.Terminate" +
+      "FragmentRequestProto\032\037.TerminateFragment" +
+      "ResponseProto2]\n\026LlapManagementProtocol\022" +
+      "C\n\022getDelegationToken\022\025.GetTokenRequestP" +
+      "roto\032\026.GetTokenResponseProtoBH\n&org.apac" +
+      "he.hadoop.hive.llap.daemon.rpcB\030LlapDaem" +
+      "onProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17394,7 +17559,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_SubmitWorkResponseProto_descriptor,
-              new java.lang.String[] { "SubmissionState", });
+              new java.lang.String[] { "SubmissionState", "UniqueNodeId", });
           internal_static_SourceStateUpdatedRequestProto_descriptor =
             getDescriptor().getMessageTypes().get(11);
           internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
index ea47330..89b0152 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/DaemonId.java
@@ -14,21 +14,24 @@
 
 package org.apache.hadoop.hive.llap;
 
+import java.util.UUID;
+
 public class DaemonId {
   private final String userName;
   private final String clusterName;
   private final String appId;
   private final String hostName;
-  private final long startTime;
+  private final long startTimeMs;
+  private final String uuidString;
 
-  public DaemonId(String userName, String clusterName, String hostName, String appId,
-      long startTime) {
+  public DaemonId(
+      String userName, String clusterName, String hostName, String appId, long startTime) {
     this.userName = userName;
     this.clusterName = clusterName;
     this.appId = appId;
     this.hostName = hostName;
-    this.startTime = startTime;
-    // TODO: we could also get an unique number per daemon.
+    this.startTimeMs = startTime;
+    this.uuidString = UUID.randomUUID().toString();
   }
 
   public String getClusterString() {
@@ -45,4 +48,8 @@ public class DaemonId {
   public String getApplicationId() {
     return appId;
   }
+
+  public String getUniqueNodeIdInCluster() {
+    return hostName + "_" + startTimeMs + "_" + uuidString;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index 9549567..dbfe54b 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -35,7 +35,7 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void nodeHeartbeat(Text hostname, int port) throws IOException;
+  public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException;
 
   public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 2e74c18..3a3a2b8 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -140,6 +140,7 @@ enum SubmissionStateProto {
 
 message SubmitWorkResponseProto {
   optional SubmissionStateProto submission_state = 1;
+  optional string unique_node_id = 2;
 }
 
 message SourceStateUpdatedRequestProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 04c28cb..93f8073 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -41,6 +41,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DaemonId;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
@@ -85,7 +86,7 @@ public class AMReporter extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
 
-  private volatile LlapNodeId nodeId;
+  private LlapNodeId nodeId;
   private final QueryFailedHandler queryFailedHandler;
   private final Configuration conf;
   private final ListeningExecutorService queueLookupExecutor;
@@ -101,13 +102,15 @@ public class AMReporter extends AbstractService {
   // messages like taskKilled, etc.
   private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
+  private final DaemonId daemonId;
 
   public AMReporter(AtomicReference<InetSocketAddress> localAddress,
-                    QueryFailedHandler queryFailedHandler, Configuration conf) {
+      QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
     this.queryFailedHandler = queryFailedHandler;
     this.conf = conf;
+    this.daemonId = daemonId;
     ExecutorService rawExecutor = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
@@ -154,8 +157,9 @@ public class AMReporter extends AbstractService {
         }
       }
     });
+    // TODO: why is this needed? we could just save the host and port?
     nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
-    LOG.info("AMReporter running with NodeId: {}", nodeId);
+    LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId);
   }
 
   @Override
@@ -336,7 +340,7 @@ public class AMReporter extends AbstractService {
             LOG.trace("NodeHeartbeat to: " + amNodeInfo);
           }
           amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
-              nodeId.getPort());
+              new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort());
         } catch (IOException e) {
           QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
           amNodeInfo.setAmFailed(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 91a321d..1346050 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -102,6 +102,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private final HadoopShim tezHadoopShim;
   private final LlapSignerImpl signer;
   private final String clusterId;
+  private final DaemonId daemonId;
   private final UgiFactory fsUgiFactory;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
@@ -120,6 +121,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     this.fsUgiFactory = fsUgiFactory;
 
     this.clusterId = daemonId.getClusterString();
+    this.daemonId = daemonId;
     this.queryTracker = new QueryTracker(conf, localDirsBase, clusterId);
     addIfService(queryTracker);
     String waitQueueSchedulerClassName = HiveConf.getVar(
@@ -262,8 +264,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       NDC.clear();
     }
 
-    responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));
-    return responseBuilder.build();
+    return responseBuilder.setUniqueNodeId(daemonId.getUniqueNodeIdInCluster())
+        .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()))
+        .build();
   }
 
   private SignableVertexSpec extractVertexSpec(SubmitWorkRequestProto request,

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 752e6ee..d90b156 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -239,7 +239,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf);
+    this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf, daemonId);
 
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/8804a7b8/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 0deebf9..c692581 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -310,8 +310,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
     // Have to register this up front right now. Otherwise, it's possible for the task to start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
-    getContext()
-        .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+    getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.sendSubmitWork(requestProto, host, port,
         new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
           @Override
@@ -331,6 +330,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
               // This should never happen as server always returns a valid status on success
               throw new RuntimeException("SubmissionState in response is expected!");
             }
+            if (response.hasUniqueNodeId()) {
+              entityTracker.registerTaskSubmittedToNode(
+                  taskSpec.getTaskAttemptID(), response.getUniqueNodeId());
+            }
             LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
           }
 
@@ -562,7 +565,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
   private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
 
-  void nodePinged(String hostname, int port) {
+  void nodePinged(String hostname, String uniqueId, int port) {
+    // TODO: do we ever need the port? we could just do away with nodeId altogether.
     LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
     registerPingingNode(nodeId);
     BiMap<ContainerId, TezTaskAttemptID> biMap =
@@ -570,8 +574,19 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     if (biMap != null) {
       synchronized (biMap) {
         for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
-          getContext().taskAlive(entry.getValue());
-          getContext().containerAlive(entry.getKey());
+          // TODO: this is a stopgap fix. We really need to change all mappings by unique node ID,
+          //       or at least (in this case) track the latest unique ID for LlapNode and retry all
+          //       older-node tasks proactively. For now let the heartbeats fail them.
+          TezTaskAttemptID attemptId = entry.getValue();
+          String taskNodeId = entityTracker.getUniqueNodeId(attemptId);
+          // Unique ID is registered based on Submit response. Theoretically, we could get a ping
+          // when the task is valid but we haven't stored the unique ID yet, so taskNodeId is null.
+          // However, the next heartbeat(s) should get the value eventually and mark task as alive.
+          // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET.
+          if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
+            getContext().taskAlive(entry.getValue());
+            getContext().containerAlive(entry.getKey());
+          }
         }
       }
     } else {
@@ -664,10 +679,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     }
 
     @Override
-    public void nodeHeartbeat(Text hostname, int port) throws IOException {
-      nodePinged(hostname.toString(), port);
+    public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
+      nodePinged(hostname.toString(), uniqueId.toString(), port);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
+        LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
       }
     }
 
@@ -697,12 +712,17 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
    */
   @VisibleForTesting
   static final class EntityTracker {
+    // TODO: need the description of how these maps are kept consistent.
     @VisibleForTesting
     final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>();
     @VisibleForTesting
     final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>();
     @VisibleForTesting
     final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>();
+    // TODO: we currently put task info everywhere before we submit it and know the "real" node id.
+    //       Therefore, we are going to store this separately. Ideally, we should roll uniqueness
+    //       into LlapNodeId. We get node info from registry; that should (or can) include it.
+    private final ConcurrentMap<TezTaskAttemptID, String> uniqueNodeMap = new ConcurrentHashMap<>();
 
     void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
       if (LOG.isDebugEnabled()) {
@@ -726,7 +746,20 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
       nodeMap.putIfAbsent(llapNodeId, usedInstance);
     }
 
+    public String getUniqueNodeId(TezTaskAttemptID attemptId) {
+      return uniqueNodeMap.get(attemptId);
+    }
+
+    public void registerTaskSubmittedToNode(
+        TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
+      String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
+      if (prev != null) {
+        LOG.warn("Replaced the unique node mapping for task from " + prev + " to " + uniqueNodeId);
+      }
+    }
+
     void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+      uniqueNodeMap.remove(attemptId);
       LlapNodeId llapNodeId = attemptToNodeMap.remove(attemptId);
       if (llapNodeId == null) {
         // Possible since either container / task can be unregistered.
@@ -820,6 +853,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
       // Remove the container mapping
       if (matched != null) {
         attemptToNodeMap.remove(matched);
+        uniqueNodeMap.remove(matched);
       }
     }
 
@@ -834,8 +868,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
      * @return
      */
     BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
-      BiMap<ContainerId, TezTaskAttemptID> biMap = nodeMap.get(llapNodeId);
-      return biMap;
+      return nodeMap.get(llapNodeId);
     }
 
   }