You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/05/28 11:02:51 UTC

[2/2] hive git commit: HIVE-10846. LLAP: preemption in AM due to failures / out of order scheduling. (Siddharth Seth)

HIVE-10846. LLAP: preemption in AM due to failures / out of order scheduling. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8b94f29
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8b94f29
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8b94f29

Branch: refs/heads/llap
Commit: b8b94f2979b96f97b8cfe98745a8e016ee6333b5
Parents: 2024c96
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 02:02:17 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 28 02:02:17 2015 -0700

----------------------------------------------------------------------
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 599 +++++--------------
 .../hive/llap/daemon/impl/AMReporter.java       |   3 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  24 +-
 .../hadoop/hive/llap/daemon/impl/Scheduler.java |   6 +
 .../llap/daemon/impl/TaskExecutorService.java   | 148 +++--
 .../llap/daemon/impl/TaskRunnerCallable.java    |  30 +-
 .../hadoop/hive/llap/tezplugins/Converters.java |   4 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   | 106 +++-
 .../hive/llap/tezplugins/TaskCommunicator.java  |  80 +--
 .../dag/app/rm/LlapTaskSchedulerService.java    | 268 ++++++++-
 .../src/protobuf/LlapDaemonProtocol.proto       |   7 +-
 .../daemon/impl/TestTaskExecutorService.java    |  82 +--
 .../app/rm/TestLlapTaskSchedulerService.java    |  74 ++-
 13 files changed, 813 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index d378955..b044df9 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -3124,20 +3124,20 @@ public final class LlapDaemonProtocolProtos {
   public interface FragmentSpecProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // optional string task_attempt_id_string = 1;
+    // optional string fragment_identifier_string = 1;
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
-    boolean hasTaskAttemptIdString();
+    boolean hasFragmentIdentifierString();
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
-    java.lang.String getTaskAttemptIdString();
+    java.lang.String getFragmentIdentifierString();
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
     com.google.protobuf.ByteString
-        getTaskAttemptIdStringBytes();
+        getFragmentIdentifierStringBytes();
 
     // optional string dag_name = 2;
     /**
@@ -3341,7 +3341,7 @@ public final class LlapDaemonProtocolProtos {
             }
             case 10: {
               bitField0_ |= 0x00000001;
-              taskAttemptIdString_ = input.readBytes();
+              fragmentIdentifierString_ = input.readBytes();
               break;
             }
             case 18: {
@@ -3455,20 +3455,20 @@ public final class LlapDaemonProtocolProtos {
     }
 
     private int bitField0_;
-    // optional string task_attempt_id_string = 1;
-    public static final int TASK_ATTEMPT_ID_STRING_FIELD_NUMBER = 1;
-    private java.lang.Object taskAttemptIdString_;
+    // optional string fragment_identifier_string = 1;
+    public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 1;
+    private java.lang.Object fragmentIdentifierString_;
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
-    public boolean hasTaskAttemptIdString() {
+    public boolean hasFragmentIdentifierString() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
-    public java.lang.String getTaskAttemptIdString() {
-      java.lang.Object ref = taskAttemptIdString_;
+    public java.lang.String getFragmentIdentifierString() {
+      java.lang.Object ref = fragmentIdentifierString_;
       if (ref instanceof java.lang.String) {
         return (java.lang.String) ref;
       } else {
@@ -3476,22 +3476,22 @@ public final class LlapDaemonProtocolProtos {
             (com.google.protobuf.ByteString) ref;
         java.lang.String s = bs.toStringUtf8();
         if (bs.isValidUtf8()) {
-          taskAttemptIdString_ = s;
+          fragmentIdentifierString_ = s;
         }
         return s;
       }
     }
     /**
-     * <code>optional string task_attempt_id_string = 1;</code>
+     * <code>optional string fragment_identifier_string = 1;</code>
      */
     public com.google.protobuf.ByteString
-        getTaskAttemptIdStringBytes() {
-      java.lang.Object ref = taskAttemptIdString_;
+        getFragmentIdentifierStringBytes() {
+      java.lang.Object ref = fragmentIdentifierString_;
       if (ref instanceof java.lang.String) {
         com.google.protobuf.ByteString b = 
             com.google.protobuf.ByteString.copyFromUtf8(
                 (java.lang.String) ref);
-        taskAttemptIdString_ = b;
+        fragmentIdentifierString_ = b;
         return b;
       } else {
         return (com.google.protobuf.ByteString) ref;
@@ -3763,7 +3763,7 @@ public final class LlapDaemonProtocolProtos {
     }
 
     private void initFields() {
-      taskAttemptIdString_ = "";
+      fragmentIdentifierString_ = "";
       dagName_ = "";
       vertexName_ = "";
       processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance();
@@ -3787,7 +3787,7 @@ public final class LlapDaemonProtocolProtos {
                         throws java.io.IOException {
       getSerializedSize();
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
-        output.writeBytes(1, getTaskAttemptIdStringBytes());
+        output.writeBytes(1, getFragmentIdentifierStringBytes());
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, getDagNameBytes());
@@ -3827,7 +3827,7 @@ public final class LlapDaemonProtocolProtos {
       size = 0;
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(1, getTaskAttemptIdStringBytes());
+          .computeBytesSize(1, getFragmentIdentifierStringBytes());
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         size += com.google.protobuf.CodedOutputStream
@@ -3888,10 +3888,10 @@ public final class LlapDaemonProtocolProtos {
       org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto) obj;
 
       boolean result = true;
-      result = result && (hasTaskAttemptIdString() == other.hasTaskAttemptIdString());
-      if (hasTaskAttemptIdString()) {
-        result = result && getTaskAttemptIdString()
-            .equals(other.getTaskAttemptIdString());
+      result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString());
+      if (hasFragmentIdentifierString()) {
+        result = result && getFragmentIdentifierString()
+            .equals(other.getFragmentIdentifierString());
       }
       result = result && (hasDagName() == other.hasDagName());
       if (hasDagName()) {
@@ -3942,9 +3942,9 @@ public final class LlapDaemonProtocolProtos {
       }
       int hash = 41;
       hash = (19 * hash) + getDescriptorForType().hashCode();
-      if (hasTaskAttemptIdString()) {
-        hash = (37 * hash) + TASK_ATTEMPT_ID_STRING_FIELD_NUMBER;
-        hash = (53 * hash) + getTaskAttemptIdString().hashCode();
+      if (hasFragmentIdentifierString()) {
+        hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER;
+        hash = (53 * hash) + getFragmentIdentifierString().hashCode();
       }
       if (hasDagName()) {
         hash = (37 * hash) + DAG_NAME_FIELD_NUMBER;
@@ -4095,7 +4095,7 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder clear() {
         super.clear();
-        taskAttemptIdString_ = "";
+        fragmentIdentifierString_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
         dagName_ = "";
         bitField0_ = (bitField0_ & ~0x00000002);
@@ -4162,7 +4162,7 @@ public final class LlapDaemonProtocolProtos {
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
-        result.taskAttemptIdString_ = taskAttemptIdString_;
+        result.fragmentIdentifierString_ = fragmentIdentifierString_;
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
@@ -4234,9 +4234,9 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other) {
         if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance()) return this;
-        if (other.hasTaskAttemptIdString()) {
+        if (other.hasFragmentIdentifierString()) {
           bitField0_ |= 0x00000001;
-          taskAttemptIdString_ = other.taskAttemptIdString_;
+          fragmentIdentifierString_ = other.fragmentIdentifierString_;
           onChanged();
         }
         if (other.hasDagName()) {
@@ -4366,76 +4366,76 @@ public final class LlapDaemonProtocolProtos {
       }
       private int bitField0_;
 
-      // optional string task_attempt_id_string = 1;
-      private java.lang.Object taskAttemptIdString_ = "";
+      // optional string fragment_identifier_string = 1;
+      private java.lang.Object fragmentIdentifierString_ = "";
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
-      public boolean hasTaskAttemptIdString() {
+      public boolean hasFragmentIdentifierString() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
-      public java.lang.String getTaskAttemptIdString() {
-        java.lang.Object ref = taskAttemptIdString_;
+      public java.lang.String getFragmentIdentifierString() {
+        java.lang.Object ref = fragmentIdentifierString_;
         if (!(ref instanceof java.lang.String)) {
           java.lang.String s = ((com.google.protobuf.ByteString) ref)
               .toStringUtf8();
-          taskAttemptIdString_ = s;
+          fragmentIdentifierString_ = s;
           return s;
         } else {
           return (java.lang.String) ref;
         }
       }
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
       public com.google.protobuf.ByteString
-          getTaskAttemptIdStringBytes() {
-        java.lang.Object ref = taskAttemptIdString_;
+          getFragmentIdentifierStringBytes() {
+        java.lang.Object ref = fragmentIdentifierString_;
         if (ref instanceof String) {
           com.google.protobuf.ByteString b = 
               com.google.protobuf.ByteString.copyFromUtf8(
                   (java.lang.String) ref);
-          taskAttemptIdString_ = b;
+          fragmentIdentifierString_ = b;
           return b;
         } else {
           return (com.google.protobuf.ByteString) ref;
         }
       }
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
-      public Builder setTaskAttemptIdString(
+      public Builder setFragmentIdentifierString(
           java.lang.String value) {
         if (value == null) {
     throw new NullPointerException();
   }
   bitField0_ |= 0x00000001;
-        taskAttemptIdString_ = value;
+        fragmentIdentifierString_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
-      public Builder clearTaskAttemptIdString() {
+      public Builder clearFragmentIdentifierString() {
         bitField0_ = (bitField0_ & ~0x00000001);
-        taskAttemptIdString_ = getDefaultInstance().getTaskAttemptIdString();
+        fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString();
         onChanged();
         return this;
       }
       /**
-       * <code>optional string task_attempt_id_string = 1;</code>
+       * <code>optional string fragment_identifier_string = 1;</code>
        */
-      public Builder setTaskAttemptIdStringBytes(
+      public Builder setFragmentIdentifierStringBytes(
           com.google.protobuf.ByteString value) {
         if (value == null) {
     throw new NullPointerException();
   }
   bitField0_ |= 0x00000001;
-        taskAttemptIdString_ = value;
+        fragmentIdentifierString_ = value;
         onChanged();
         return this;
       }
@@ -10847,50 +10847,20 @@ public final class LlapDaemonProtocolProtos {
     com.google.protobuf.ByteString
         getDagNameBytes();
 
-    // optional int32 dag_attempt_number = 3;
+    // optional string fragment_identifier_string = 7;
     /**
-     * <code>optional int32 dag_attempt_number = 3;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
-    boolean hasDagAttemptNumber();
+    boolean hasFragmentIdentifierString();
     /**
-     * <code>optional int32 dag_attempt_number = 3;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
-    int getDagAttemptNumber();
-
-    // optional string vertex_name = 4;
-    /**
-     * <code>optional string vertex_name = 4;</code>
-     */
-    boolean hasVertexName();
+    java.lang.String getFragmentIdentifierString();
     /**
-     * <code>optional string vertex_name = 4;</code>
-     */
-    java.lang.String getVertexName();
-    /**
-     * <code>optional string vertex_name = 4;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
     com.google.protobuf.ByteString
-        getVertexNameBytes();
-
-    // optional int32 fragment_number = 5;
-    /**
-     * <code>optional int32 fragment_number = 5;</code>
-     */
-    boolean hasFragmentNumber();
-    /**
-     * <code>optional int32 fragment_number = 5;</code>
-     */
-    int getFragmentNumber();
-
-    // optional int32 attempt_number = 6;
-    /**
-     * <code>optional int32 attempt_number = 6;</code>
-     */
-    boolean hasAttemptNumber();
-    /**
-     * <code>optional int32 attempt_number = 6;</code>
-     */
-    int getAttemptNumber();
+        getFragmentIdentifierStringBytes();
   }
   /**
    * Protobuf type {@code TerminateFragmentRequestProto}
@@ -10953,24 +10923,9 @@ public final class LlapDaemonProtocolProtos {
               dagName_ = input.readBytes();
               break;
             }
-            case 24: {
+            case 58: {
               bitField0_ |= 0x00000004;
-              dagAttemptNumber_ = input.readInt32();
-              break;
-            }
-            case 34: {
-              bitField0_ |= 0x00000008;
-              vertexName_ = input.readBytes();
-              break;
-            }
-            case 40: {
-              bitField0_ |= 0x00000010;
-              fragmentNumber_ = input.readInt32();
-              break;
-            }
-            case 48: {
-              bitField0_ |= 0x00000020;
-              attemptNumber_ = input.readInt32();
+              fragmentIdentifierString_ = input.readBytes();
               break;
             }
           }
@@ -11099,36 +11054,20 @@ public final class LlapDaemonProtocolProtos {
       }
     }
 
-    // optional int32 dag_attempt_number = 3;
-    public static final int DAG_ATTEMPT_NUMBER_FIELD_NUMBER = 3;
-    private int dagAttemptNumber_;
+    // optional string fragment_identifier_string = 7;
+    public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 7;
+    private java.lang.Object fragmentIdentifierString_;
     /**
-     * <code>optional int32 dag_attempt_number = 3;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
-    public boolean hasDagAttemptNumber() {
+    public boolean hasFragmentIdentifierString() {
       return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     /**
-     * <code>optional int32 dag_attempt_number = 3;</code>
-     */
-    public int getDagAttemptNumber() {
-      return dagAttemptNumber_;
-    }
-
-    // optional string vertex_name = 4;
-    public static final int VERTEX_NAME_FIELD_NUMBER = 4;
-    private java.lang.Object vertexName_;
-    /**
-     * <code>optional string vertex_name = 4;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
-    public boolean hasVertexName() {
-      return ((bitField0_ & 0x00000008) == 0x00000008);
-    }
-    /**
-     * <code>optional string vertex_name = 4;</code>
-     */
-    public java.lang.String getVertexName() {
-      java.lang.Object ref = vertexName_;
+    public java.lang.String getFragmentIdentifierString() {
+      java.lang.Object ref = fragmentIdentifierString_;
       if (ref instanceof java.lang.String) {
         return (java.lang.String) ref;
       } else {
@@ -11136,67 +11075,32 @@ public final class LlapDaemonProtocolProtos {
             (com.google.protobuf.ByteString) ref;
         java.lang.String s = bs.toStringUtf8();
         if (bs.isValidUtf8()) {
-          vertexName_ = s;
+          fragmentIdentifierString_ = s;
         }
         return s;
       }
     }
     /**
-     * <code>optional string vertex_name = 4;</code>
+     * <code>optional string fragment_identifier_string = 7;</code>
      */
     public com.google.protobuf.ByteString
-        getVertexNameBytes() {
-      java.lang.Object ref = vertexName_;
+        getFragmentIdentifierStringBytes() {
+      java.lang.Object ref = fragmentIdentifierString_;
       if (ref instanceof java.lang.String) {
         com.google.protobuf.ByteString b = 
             com.google.protobuf.ByteString.copyFromUtf8(
                 (java.lang.String) ref);
-        vertexName_ = b;
+        fragmentIdentifierString_ = b;
         return b;
       } else {
         return (com.google.protobuf.ByteString) ref;
       }
     }
 
-    // optional int32 fragment_number = 5;
-    public static final int FRAGMENT_NUMBER_FIELD_NUMBER = 5;
-    private int fragmentNumber_;
-    /**
-     * <code>optional int32 fragment_number = 5;</code>
-     */
-    public boolean hasFragmentNumber() {
-      return ((bitField0_ & 0x00000010) == 0x00000010);
-    }
-    /**
-     * <code>optional int32 fragment_number = 5;</code>
-     */
-    public int getFragmentNumber() {
-      return fragmentNumber_;
-    }
-
-    // optional int32 attempt_number = 6;
-    public static final int ATTEMPT_NUMBER_FIELD_NUMBER = 6;
-    private int attemptNumber_;
-    /**
-     * <code>optional int32 attempt_number = 6;</code>
-     */
-    public boolean hasAttemptNumber() {
-      return ((bitField0_ & 0x00000020) == 0x00000020);
-    }
-    /**
-     * <code>optional int32 attempt_number = 6;</code>
-     */
-    public int getAttemptNumber() {
-      return attemptNumber_;
-    }
-
     private void initFields() {
       queryId_ = "";
       dagName_ = "";
-      dagAttemptNumber_ = 0;
-      vertexName_ = "";
-      fragmentNumber_ = 0;
-      attemptNumber_ = 0;
+      fragmentIdentifierString_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -11217,16 +11121,7 @@ public final class LlapDaemonProtocolProtos {
         output.writeBytes(2, getDagNameBytes());
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
-        output.writeInt32(3, dagAttemptNumber_);
-      }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        output.writeBytes(4, getVertexNameBytes());
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        output.writeInt32(5, fragmentNumber_);
-      }
-      if (((bitField0_ & 0x00000020) == 0x00000020)) {
-        output.writeInt32(6, attemptNumber_);
+        output.writeBytes(7, getFragmentIdentifierStringBytes());
       }
       getUnknownFields().writeTo(output);
     }
@@ -11247,19 +11142,7 @@ public final class LlapDaemonProtocolProtos {
       }
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeInt32Size(3, dagAttemptNumber_);
-      }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(4, getVertexNameBytes());
-      }
-      if (((bitField0_ & 0x00000010) == 0x00000010)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt32Size(5, fragmentNumber_);
-      }
-      if (((bitField0_ & 0x00000020) == 0x00000020)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeInt32Size(6, attemptNumber_);
+          .computeBytesSize(7, getFragmentIdentifierStringBytes());
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -11294,25 +11177,10 @@ public final class LlapDaemonProtocolProtos {
         result = result && getDagName()
             .equals(other.getDagName());
       }
-      result = result && (hasDagAttemptNumber() == other.hasDagAttemptNumber());
-      if (hasDagAttemptNumber()) {
-        result = result && (getDagAttemptNumber()
-            == other.getDagAttemptNumber());
-      }
-      result = result && (hasVertexName() == other.hasVertexName());
-      if (hasVertexName()) {
-        result = result && getVertexName()
-            .equals(other.getVertexName());
-      }
-      result = result && (hasFragmentNumber() == other.hasFragmentNumber());
-      if (hasFragmentNumber()) {
-        result = result && (getFragmentNumber()
-            == other.getFragmentNumber());
-      }
-      result = result && (hasAttemptNumber() == other.hasAttemptNumber());
-      if (hasAttemptNumber()) {
-        result = result && (getAttemptNumber()
-            == other.getAttemptNumber());
+      result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString());
+      if (hasFragmentIdentifierString()) {
+        result = result && getFragmentIdentifierString()
+            .equals(other.getFragmentIdentifierString());
       }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
@@ -11335,21 +11203,9 @@ public final class LlapDaemonProtocolProtos {
         hash = (37 * hash) + DAG_NAME_FIELD_NUMBER;
         hash = (53 * hash) + getDagName().hashCode();
       }
-      if (hasDagAttemptNumber()) {
-        hash = (37 * hash) + DAG_ATTEMPT_NUMBER_FIELD_NUMBER;
-        hash = (53 * hash) + getDagAttemptNumber();
-      }
-      if (hasVertexName()) {
-        hash = (37 * hash) + VERTEX_NAME_FIELD_NUMBER;
-        hash = (53 * hash) + getVertexName().hashCode();
-      }
-      if (hasFragmentNumber()) {
-        hash = (37 * hash) + FRAGMENT_NUMBER_FIELD_NUMBER;
-        hash = (53 * hash) + getFragmentNumber();
-      }
-      if (hasAttemptNumber()) {
-        hash = (37 * hash) + ATTEMPT_NUMBER_FIELD_NUMBER;
-        hash = (53 * hash) + getAttemptNumber();
+      if (hasFragmentIdentifierString()) {
+        hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER;
+        hash = (53 * hash) + getFragmentIdentifierString().hashCode();
       }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
@@ -11464,14 +11320,8 @@ public final class LlapDaemonProtocolProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         dagName_ = "";
         bitField0_ = (bitField0_ & ~0x00000002);
-        dagAttemptNumber_ = 0;
+        fragmentIdentifierString_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
-        vertexName_ = "";
-        bitField0_ = (bitField0_ & ~0x00000008);
-        fragmentNumber_ = 0;
-        bitField0_ = (bitField0_ & ~0x00000010);
-        attemptNumber_ = 0;
-        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -11511,19 +11361,7 @@ public final class LlapDaemonProtocolProtos {
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
-        result.dagAttemptNumber_ = dagAttemptNumber_;
-        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
-          to_bitField0_ |= 0x00000008;
-        }
-        result.vertexName_ = vertexName_;
-        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
-          to_bitField0_ |= 0x00000010;
-        }
-        result.fragmentNumber_ = fragmentNumber_;
-        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
-          to_bitField0_ |= 0x00000020;
-        }
-        result.attemptNumber_ = attemptNumber_;
+        result.fragmentIdentifierString_ = fragmentIdentifierString_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -11550,20 +11388,11 @@ public final class LlapDaemonProtocolProtos {
           dagName_ = other.dagName_;
           onChanged();
         }
-        if (other.hasDagAttemptNumber()) {
-          setDagAttemptNumber(other.getDagAttemptNumber());
-        }
-        if (other.hasVertexName()) {
-          bitField0_ |= 0x00000008;
-          vertexName_ = other.vertexName_;
+        if (other.hasFragmentIdentifierString()) {
+          bitField0_ |= 0x00000004;
+          fragmentIdentifierString_ = other.fragmentIdentifierString_;
           onChanged();
         }
-        if (other.hasFragmentNumber()) {
-          setFragmentNumber(other.getFragmentNumber());
-        }
-        if (other.hasAttemptNumber()) {
-          setAttemptNumber(other.getAttemptNumber());
-        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -11739,175 +11568,76 @@ public final class LlapDaemonProtocolProtos {
         return this;
       }
 
-      // optional int32 dag_attempt_number = 3;
-      private int dagAttemptNumber_ ;
+      // optional string fragment_identifier_string = 7;
+      private java.lang.Object fragmentIdentifierString_ = "";
       /**
-       * <code>optional int32 dag_attempt_number = 3;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
-      public boolean hasDagAttemptNumber() {
+      public boolean hasFragmentIdentifierString() {
         return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       /**
-       * <code>optional int32 dag_attempt_number = 3;</code>
-       */
-      public int getDagAttemptNumber() {
-        return dagAttemptNumber_;
-      }
-      /**
-       * <code>optional int32 dag_attempt_number = 3;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
-      public Builder setDagAttemptNumber(int value) {
-        bitField0_ |= 0x00000004;
-        dagAttemptNumber_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int32 dag_attempt_number = 3;</code>
-       */
-      public Builder clearDagAttemptNumber() {
-        bitField0_ = (bitField0_ & ~0x00000004);
-        dagAttemptNumber_ = 0;
-        onChanged();
-        return this;
-      }
-
-      // optional string vertex_name = 4;
-      private java.lang.Object vertexName_ = "";
-      /**
-       * <code>optional string vertex_name = 4;</code>
-       */
-      public boolean hasVertexName() {
-        return ((bitField0_ & 0x00000008) == 0x00000008);
-      }
-      /**
-       * <code>optional string vertex_name = 4;</code>
-       */
-      public java.lang.String getVertexName() {
-        java.lang.Object ref = vertexName_;
+      public java.lang.String getFragmentIdentifierString() {
+        java.lang.Object ref = fragmentIdentifierString_;
         if (!(ref instanceof java.lang.String)) {
           java.lang.String s = ((com.google.protobuf.ByteString) ref)
               .toStringUtf8();
-          vertexName_ = s;
+          fragmentIdentifierString_ = s;
           return s;
         } else {
           return (java.lang.String) ref;
         }
       }
       /**
-       * <code>optional string vertex_name = 4;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
       public com.google.protobuf.ByteString
-          getVertexNameBytes() {
-        java.lang.Object ref = vertexName_;
+          getFragmentIdentifierStringBytes() {
+        java.lang.Object ref = fragmentIdentifierString_;
         if (ref instanceof String) {
           com.google.protobuf.ByteString b = 
               com.google.protobuf.ByteString.copyFromUtf8(
                   (java.lang.String) ref);
-          vertexName_ = b;
+          fragmentIdentifierString_ = b;
           return b;
         } else {
           return (com.google.protobuf.ByteString) ref;
         }
       }
       /**
-       * <code>optional string vertex_name = 4;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
-      public Builder setVertexName(
+      public Builder setFragmentIdentifierString(
           java.lang.String value) {
         if (value == null) {
     throw new NullPointerException();
   }
-  bitField0_ |= 0x00000008;
-        vertexName_ = value;
+  bitField0_ |= 0x00000004;
+        fragmentIdentifierString_ = value;
         onChanged();
         return this;
       }
       /**
-       * <code>optional string vertex_name = 4;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
-      public Builder clearVertexName() {
-        bitField0_ = (bitField0_ & ~0x00000008);
-        vertexName_ = getDefaultInstance().getVertexName();
+      public Builder clearFragmentIdentifierString() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString();
         onChanged();
         return this;
       }
       /**
-       * <code>optional string vertex_name = 4;</code>
+       * <code>optional string fragment_identifier_string = 7;</code>
        */
-      public Builder setVertexNameBytes(
+      public Builder setFragmentIdentifierStringBytes(
           com.google.protobuf.ByteString value) {
         if (value == null) {
     throw new NullPointerException();
   }
-  bitField0_ |= 0x00000008;
-        vertexName_ = value;
-        onChanged();
-        return this;
-      }
-
-      // optional int32 fragment_number = 5;
-      private int fragmentNumber_ ;
-      /**
-       * <code>optional int32 fragment_number = 5;</code>
-       */
-      public boolean hasFragmentNumber() {
-        return ((bitField0_ & 0x00000010) == 0x00000010);
-      }
-      /**
-       * <code>optional int32 fragment_number = 5;</code>
-       */
-      public int getFragmentNumber() {
-        return fragmentNumber_;
-      }
-      /**
-       * <code>optional int32 fragment_number = 5;</code>
-       */
-      public Builder setFragmentNumber(int value) {
-        bitField0_ |= 0x00000010;
-        fragmentNumber_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int32 fragment_number = 5;</code>
-       */
-      public Builder clearFragmentNumber() {
-        bitField0_ = (bitField0_ & ~0x00000010);
-        fragmentNumber_ = 0;
-        onChanged();
-        return this;
-      }
-
-      // optional int32 attempt_number = 6;
-      private int attemptNumber_ ;
-      /**
-       * <code>optional int32 attempt_number = 6;</code>
-       */
-      public boolean hasAttemptNumber() {
-        return ((bitField0_ & 0x00000020) == 0x00000020);
-      }
-      /**
-       * <code>optional int32 attempt_number = 6;</code>
-       */
-      public int getAttemptNumber() {
-        return attemptNumber_;
-      }
-      /**
-       * <code>optional int32 attempt_number = 6;</code>
-       */
-      public Builder setAttemptNumber(int value) {
-        bitField0_ |= 0x00000020;
-        attemptNumber_ = value;
-        onChanged();
-        return this;
-      }
-      /**
-       * <code>optional int32 attempt_number = 6;</code>
-       */
-      public Builder clearAttemptNumber() {
-        bitField0_ = (bitField0_ & ~0x00000020);
-        attemptNumber_ = 0;
+  bitField0_ |= 0x00000004;
+        fragmentIdentifierString_ = value;
         onChanged();
         return this;
       }
@@ -12796,52 +12526,51 @@ public final class LlapDaemonProtocolProtos {
       "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" +
       "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d",
       "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" +
-      "\327\002\n\021FragmentSpecProto\022\036\n\026task_attempt_id" +
-      "_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013verte" +
-      "x_name\030\003 \001(\t\0224\n\024processor_descriptor\030\004 \001" +
-      "(\0132\026.EntityDescriptorProto\022!\n\013input_spec" +
-      "s\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_specs\030\006 " +
-      "\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" +
-      "\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" +
-      "arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" +
-      "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu",
-      "ntimeInfo\022#\n\033num_self_and_upstream_tasks" +
-      "\030\001 \001(\005\022-\n%num_self_and_upstream_complete" +
-      "d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" +
-      "(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" +
-      "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" +
-      "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" +
-      "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" +
-      "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" +
-      "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" +
-      "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030",
-      "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" +
-      "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" +
-      "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" +
-      "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" +
-      "SourceStateUpdatedRequestProto\022\020\n\010dag_na" +
-      "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" +
-      "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" +
-      "edResponseProto\"X\n\031QueryCompleteRequestP" +
-      "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" +
-      "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple",
-      "teResponseProto\"\245\001\n\035TerminateFragmentReq" +
-      "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" +
-      "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" +
-      "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" +
-      "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" +
-      "mentResponseProto*2\n\020SourceStateProto\022\017\n" +
-      "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" +
-      "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" +
-      "equestProto\032\030.SubmitWorkResponseProto\022W\n" +
-      "\022sourceStateUpdated\022\037.SourceStateUpdated",
-      "RequestProto\032 .SourceStateUpdatedRespons" +
-      "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" +
-      "equestProto\032\033.QueryCompleteResponseProto" +
-      "\022T\n\021terminateFragment\022\036.TerminateFragmen" +
-      "tRequestProto\032\037.TerminateFragmentRespons" +
-      "eProtoBH\n&org.apache.hadoop.hive.llap.da" +
-      "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001"
+      "\333\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" +
+      "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013v" +
+      "ertex_name\030\003 \001(\t\0224\n\024processor_descriptor" +
+      "\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input_" +
+      "specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spec" +
+      "s\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_s" +
+      "pecs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vert" +
+      "ex_parallelism\030\010 \001(\005\022\027\n\017fragment_number\030" +
+      "\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragme",
+      "ntRuntimeInfo\022#\n\033num_self_and_upstream_t" +
+      "asks\030\001 \001(\005\022-\n%num_self_and_upstream_comp" +
+      "leted_tasks\030\002 \001(\005\022\033\n\023within_dag_priority" +
+      "\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_" +
+      "attempt_start_time\030\005 \001(\003\022\"\n\032current_atte" +
+      "mpt_start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkReque" +
+      "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" +
+      "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" +
+      "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" +
+      " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str",
+      "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" +
+      "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" +
+      "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" +
+      "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" +
+      "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" +
+      "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" +
+      "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" +
+      "pdatedResponseProto\"X\n\031QueryCompleteRequ" +
+      "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" +
+      " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo",
+      "mpleteResponseProto\"g\n\035TerminateFragment" +
+      "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" +
+      "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" +
+      " \001(\t\" \n\036TerminateFragmentResponseProto*2" +
+      "\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS" +
+      "_RUNNING\020\0022\316\002\n\022LlapDaemonProtocol\022?\n\nsub" +
+      "mitWork\022\027.SubmitWorkRequestProto\032\030.Submi" +
+      "tWorkResponseProto\022W\n\022sourceStateUpdated" +
+      "\022\037.SourceStateUpdatedRequestProto\032 .Sour" +
+      "ceStateUpdatedResponseProto\022H\n\rqueryComp",
+      "lete\022\032.QueryCompleteRequestProto\032\033.Query" +
+      "CompleteResponseProto\022T\n\021terminateFragme" +
+      "nt\022\036.TerminateFragmentRequestProto\032\037.Ter" +
+      "minateFragmentResponseProtoBH\n&org.apach" +
+      "e.hadoop.hive.llap.daemon.rpcB\030LlapDaemo" +
+      "nProtocolProtos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12877,7 +12606,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_FragmentSpecProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_FragmentSpecProto_descriptor,
-              new java.lang.String[] { "TaskAttemptIdString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", });
+              new java.lang.String[] { "FragmentIdentifierString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", });
           internal_static_FragmentRuntimeInfo_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_FragmentRuntimeInfo_fieldAccessorTable = new
@@ -12925,7 +12654,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_TerminateFragmentRequestProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_TerminateFragmentRequestProto_descriptor,
-              new java.lang.String[] { "QueryId", "DagName", "DagAttemptNumber", "VertexName", "FragmentNumber", "AttemptNumber", });
+              new java.lang.String[] { "QueryId", "DagName", "FragmentIdentifierString", });
           internal_static_TerminateFragmentResponseProto_descriptor =
             getDescriptor().getMessageTypes().get(13);
           internal_static_TerminateFragmentResponseProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index ea2d77a..1620ddf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -101,6 +101,7 @@ public class AMReporter extends AbstractService {
     this.heartbeatInterval =
         conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS,
             LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    LOG.info("AMReporter running with NodeId: {}", nodeId);
   }
 
   @Override
@@ -164,7 +165,7 @@ public class AMReporter extends AbstractService {
     synchronized (knownAppMasters) {
       amNodeInfo = knownAppMasters.get(amNodeId);
       if (amNodeInfo == null) {
-        LOG.error(("Ignoring unexpected unregisterRequest for am at: " + amLocation + ":" + port));
+        LOG.info(("Ignoring duplocate unregisterRequest for am at: " + amLocation + ":" + port));
       }
       amNodeInfo.decrementAndGetTaskCount();
       // Not removing this here. Will be removed when taken off the queue and discovered to have 0

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index d594d6a..2f2ccb0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -55,11 +54,16 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // TODO Convert this to a CompositeService
 public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler {
 
-  private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class);
+  // TODO Setup a set of threads to process incoming requests.
+  // Make sure requests for a single dag/query are handled by the same thread
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class);
   public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor ";
 
   private volatile AMReporter amReporter;
@@ -143,12 +147,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
     LOG.info("Queueing container for execution: " + stringifySubmitRequest(request));
     // This is the start of container-annotated logging.
     // TODO Reduce the length of this string. Way too verbose at the moment.
-    String ndcContextString =
-        request.getContainerIdString() + "_" +
-            request.getFragmentSpec().getDagName() + "_" +
-            request.getFragmentSpec().getVertexName() +
-            "_" + request.getFragmentSpec().getFragmentNumber() + "_" +
-            request.getFragmentSpec().getAttemptNumber();
+    String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString();
     NDC.push(ndcContextString);
     try {
       Map<String, String> env = new HashMap<>();
@@ -158,7 +157,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
 
       FragmentSpecProto fragmentSpec = request.getFragmentSpec();
       TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(
-          fragmentSpec.getTaskAttemptIdString());
+          fragmentSpec.getFragmentIdentifierString());
       int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
 
       QueryFragmentInfo fragmentInfo = queryTracker
@@ -222,7 +221,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
 
   @Override
   public void terminateFragment(TerminateFragmentRequestProto request) {
-    // TODO Implement when this gets used.
+    LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
+    executorService.killFragment(request.getFragmentIdentifierString());
   }
 
   private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
@@ -235,15 +235,15 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
 
   public static String stringifySubmitRequest(SubmitWorkRequestProto request) {
     StringBuilder sb = new StringBuilder();
+    FragmentSpecProto fragmentSpec = request.getFragmentSpec();
     sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort());
+    sb.append(", taskInfo=").append(fragmentSpec.getFragmentIdentifierString());
     sb.append(", user=").append(request.getUser());
     sb.append(", appIdString=").append(request.getApplicationIdString());
     sb.append(", appAttemptNum=").append(request.getAppAttemptNumber());
     sb.append(", containerIdString=").append(request.getContainerIdString());
-    FragmentSpecProto fragmentSpec = request.getFragmentSpec();
     sb.append(", dagName=").append(fragmentSpec.getDagName());
     sb.append(", vertexName=").append(fragmentSpec.getVertexName());
-    sb.append(", taskInfo=").append(fragmentSpec.getTaskAttemptIdString());
     sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName());
     sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount());
     sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount());

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index c3102f9..eb06a2f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -30,4 +30,10 @@ public interface Scheduler<T> {
    * @throws RejectedExecutionException
    */
   void schedule(T t) throws RejectedExecutionException;
+
+  /**
+   * Attempt to kill the fragment with the specified fragmentId
+   * @param fragmentId
+   */
+  void killFragment(String fragmentId);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index bfc4d89..453a71e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -29,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -74,6 +74,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
   private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d";
   private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d";
 
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
   // Thread pool for actual execution of work.
   private final ListeningExecutorService executorService;
   private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
@@ -87,7 +89,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
   private final AtomicInteger numSlotsAvailable;
 
   // Tracks known tasks.
-  private final Set<TaskWrapper> knownTasks = Collections.newSetFromMap(new ConcurrentHashMap<TaskWrapper, Boolean>());
+  private final ConcurrentMap<String, TaskWrapper> knownTasks = new ConcurrentHashMap<>();
 
   private final Object lock = new Object();
 
@@ -131,27 +133,32 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
     @Override
     public void run() {
+
       try {
 
-        synchronized (lock) {
-          while (waitQueue.isEmpty()) {
-            lock.wait();
-          }
-        }
 
-        // Since schedule() can be called from multiple threads, we peek the wait queue,
-        // try scheduling the task and then remove the task if scheduling is successful.
-        // This will make sure the task's place in the wait queue is held until it gets scheduled.
-        while (true) {
+        while (!isShutdown.get()) {
           synchronized (lock) {
+            // Since schedule() can be called from multiple threads, we peek the wait queue,
+            // try scheduling the task and then remove the task if scheduling is successful.
+            // This will make sure the task's place in the wait queue is held until it gets scheduled.
             task = waitQueue.peek();
             if (task == null) {
-              break;
+              if (!isShutdown.get()) {
+                lock.wait();
+              }
+              continue;
             }
 
-          // if the task cannot finish and if no slots are available then don't schedule it.
+            // if the task cannot finish and if no slots are available then don't schedule it.
             boolean shouldWait = false;
             if (task.getTaskRunnerCallable().canFinish()) {
+              if (isDebugEnabled) {
+                LOG.debug(
+                    "Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={}, numSlotsAvailable={}",
+                    task.getRequestId(), task.getTaskRunnerCallable().canFinish(),
+                    preemptionQueue.size(), numSlotsAvailable.get());
+              }
               if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) {
                 shouldWait = true;
               }
@@ -161,7 +168,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
               }
             }
             if (shouldWait) {
-              lock.wait();
+              if (!isShutdown.get()) {
+                lock.wait();
+              }
               // Another task at a higher priority may have come in during the wait. Lookup the
               // queue again to pick up the task at the highest priority.
               continue;
@@ -179,15 +188,20 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
           synchronized (lock) {
             while (waitQueue.isEmpty()) {
-              lock.wait();
+              if (!isShutdown.get()) {
+                lock.wait();
+              }
             }
           }
         }
 
       } catch (InterruptedException e) {
-        // Executor service will create new thread if the current thread gets interrupted. We don't
-        // need to do anything with the exception.
-        LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted.");
+        if (isShutdown.get()) {
+          LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted after shutdown.");
+        } else {
+          LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown", e);
+          throw new RuntimeException(e);
+        }
       }
     }
   }
@@ -207,24 +221,23 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
   @Override
   public void schedule(TaskRunnerCallable task) throws RejectedExecutionException {
-    TaskWrapper taskWrapper = new TaskWrapper(task);
-    knownTasks.add(taskWrapper);
+    TaskWrapper taskWrapper = new TaskWrapper(task, this);
+    knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
     TaskWrapper evictedTask;
     try {
       // Don't need a lock. Not subscribed for notifications yet, and marked as inWaitQueue
       evictedTask = waitQueue.offer(taskWrapper);
     } catch (RejectedExecutionException e) {
-      knownTasks.remove(taskWrapper);
+      knownTasks.remove(taskWrapper.getRequestId());
       throw e;
     }
-    if (evictedTask == null) {
-      if (isInfoEnabled) {
-        LOG.info(task.getRequestId() + " added to wait queue.");
-      }
-      if (isDebugEnabled) {
-        LOG.debug("Wait Queue: {}", waitQueue);
-      }
-    } else {
+    if (isInfoEnabled) {
+      LOG.info("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size());
+    }
+    if (isDebugEnabled) {
+      LOG.debug("Wait Queue: {}", waitQueue);
+    }
+    if (evictedTask != null) {
       evictedTask.maybeUnregisterForFinishedStateNotifications();
       evictedTask.getTaskRunnerCallable().killTask();
       if (isInfoEnabled) {
@@ -237,13 +250,41 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
     }
   }
 
+  @Override
+  public void killFragment(String fragmentId) {
+    synchronized (lock) {
+      TaskWrapper taskWrapper = knownTasks.remove(fragmentId);
+      // Can be null since the task may have completed meanwhile.
+      if (taskWrapper != null) {
+        if (taskWrapper.inWaitQueue) {
+          if (isDebugEnabled) {
+            LOG.debug("Removing {} from waitQueue", fragmentId);
+          }
+          taskWrapper.setIsInWaitQueue(false);
+          waitQueue.remove(taskWrapper);
+        }
+        if (taskWrapper.inPreemptionQueue) {
+          if (isDebugEnabled) {
+            LOG.debug("Removing {} from preemptionQueue", fragmentId);
+          }
+          taskWrapper.setIsInPreemptableQueue(false);
+          preemptionQueue.remove(taskWrapper);
+        }
+        taskWrapper.getTaskRunnerCallable().killTask();
+      } else {
+        LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId);
+      }
+      lock.notify();
+    }
+  }
+
   private boolean trySchedule(final TaskWrapper taskWrapper) {
 
     boolean scheduled = false;
     try {
       synchronized (lock) {
         boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
-        boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+        boolean stateChanged  = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
         ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable());
         taskWrapper.setIsInWaitQueue(false);
         FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(taskWrapper);
@@ -357,7 +398,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
     @Override
     public void onSuccess(TaskRunner2Result result) {
-      knownTasks.remove(taskWrapper);
+      knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
       taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
@@ -366,7 +407,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
     @Override
     public void onFailure(Throwable t) {
-      knownTasks.remove(taskWrapper);
+      knownTasks.remove(taskWrapper.getRequestId());
       taskWrapper.setIsInPreemptableQueue(false);
       taskWrapper.maybeUnregisterForFinishedStateNotifications();
       taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
@@ -387,6 +428,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
       }
 
       numSlotsAvailable.incrementAndGet();
+      LOG.info("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}",
+          taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(),
+          preemptionQueue.size());
       synchronized (lock) {
         if (!waitQueue.isEmpty()) {
           lock.notify();
@@ -398,21 +442,23 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
 
   // TODO: llap daemon should call this to gracefully shutdown the task executor service
   public void shutDown(boolean awaitTermination) {
-    if (awaitTermination) {
-      if (isDebugEnabled) {
-        LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
-            " service gracefully");
-      }
-      shutdownExecutor(waitQueueExecutorService);
-      shutdownExecutor(executorService);
-      shutdownExecutor(executionCompletionExecutorService);
-    } else {
-      if (isDebugEnabled) {
-        LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
-            " service immediately");
+    if (!isShutdown.getAndSet(true)) {
+      if (awaitTermination) {
+        if (isDebugEnabled) {
+          LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
+              " service gracefully");
+        }
+        shutdownExecutor(waitQueueExecutorService);
+        shutdownExecutor(executorService);
+        shutdownExecutor(executionCompletionExecutorService);
+      } else {
+        if (isDebugEnabled) {
+          LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" +
+              " service immediately");
+        }
+        executorService.shutdownNow();
+        waitQueueExecutorService.shutdownNow();
       }
-      executorService.shutdownNow();
-      waitQueueExecutorService.shutdownNow();
     }
   }
 
@@ -474,14 +520,16 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
   }
 
 
-  private class TaskWrapper implements FinishableStateUpdateHandler {
+  public static class TaskWrapper implements FinishableStateUpdateHandler {
     private final TaskRunnerCallable taskRunnerCallable;
     private boolean inWaitQueue = true;
     private boolean inPreemptionQueue = false;
     private boolean registeredForNotifications = false;
+    private final TaskExecutorService taskExecutorService;
 
-    public TaskWrapper(TaskRunnerCallable taskRunnerCallable) {
+    public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService taskExecutorService) {
       this.taskRunnerCallable = taskRunnerCallable;
+      this.taskExecutorService = taskExecutorService;
     }
 
     // Methods are synchronized primarily for visibility.
@@ -548,7 +596,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
       // Meanwhile the scheduler could try updating states via a synchronized method.
       LOG.info("DEBUG: Received finishable state update for {}, state={}",
           taskRunnerCallable.getRequestId(), finishableState);
-      TaskExecutorService.this.finishableStateUpdated(this, finishableState);
+      taskExecutorService.finishableStateUpdated(this, finishableState);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 007c83d..1c12e12 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -98,6 +98,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
   private final AtomicBoolean isCompleted = new AtomicBoolean(false);
   private final AtomicBoolean killInvoked = new AtomicBoolean(false);
 
@@ -127,13 +128,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         request.getUser(), jobToken);
     }
     this.metrics = metrics;
-    this.requestId = getTaskAttemptId(request);
+    this.requestId = getRequestId(request);
     this.killedTaskHandler = killedTaskHandler;
     this.fragmentCompletionHanler = fragmentCompleteHandler;
   }
 
   @Override
   protected TaskRunner2Result callInternal() throws Exception {
+    isStarted.set(true);
+
     this.startTime = System.currentTimeMillis();
     this.threadName = Thread.currentThread().getName();
     if (LOG.isDebugEnabled()) {
@@ -143,12 +146,19 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     // Unregister from the AMReporter, since the task is now running.
     this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
 
+    synchronized (this) {
+      if (!shouldRunTask) {
+        LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID());
+        return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+      }
+    }
+
     // TODO This executor seems unnecessary. Here and TezChild
     ExecutorService executorReal = Executors.newFixedThreadPool(1,
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat(
-                "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString())
+                "TezTaskRunner_" + request.getFragmentSpec().getFragmentIdentifierString())
             .build());
     executor = MoreExecutors.listeningDecorator(executorReal);
 
@@ -244,6 +254,16 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
                   taskSpec.getTaskAttemptID());
             }
             shouldRunTask = false;
+          } else {
+            // If the task hasn't started, and it is killed - report back to the AM that the task has been killed.
+            LOG.info("DBG: Reporting taskKilled for non-started fragment {}", getRequestId());
+            reportTaskKilled();
+          }
+          if (!isStarted.get()) {
+            // If the task hasn't started - inform about fragment completion immediately. It's possible for
+            // the callable to never run.
+            fragmentCompletionHanler.fragmentComplete(fragmentInfo);
+            this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
           }
         }
       } else {
@@ -360,7 +380,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
           metrics.incrExecutorTotalSuccess();
           break;
         case CONTAINER_STOP_REQUESTED:
-          LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId);
+          LOG.info("Received container stop request (AM preemption) for {}", requestId);
           break;
         case KILL_REQUESTED:
           LOG.info("Killed task {}", requestId);
@@ -439,8 +459,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     return sb.toString();
   }
 
-  private String getTaskAttemptId(SubmitWorkRequestProto request) {
-    return request.getFragmentSpec().getTaskAttemptIdString();
+  private static String getRequestId(SubmitWorkRequestProto request) {
+    return request.getFragmentSpec().getFragmentIdentifierString();
   }
 
   public long getFirstAttemptStartTime() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
index 5bd1fe9..7428a6a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java
@@ -43,7 +43,7 @@ public class Converters {
 
   public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) {
     TezTaskAttemptID taskAttemptID =
-        TezTaskAttemptID.fromString(FragmentSpecProto.getTaskAttemptIdString());
+        TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString());
 
     ProcessorDescriptor processorDescriptor = null;
     if (FragmentSpecProto.hasProcessorDescriptor()) {
@@ -83,7 +83,7 @@ public class Converters {
 
   public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
     FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder();
-    builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
+    builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString());
     builder.setDagName(taskSpec.getDAGName());
     builder.setVertexName(taskSpec.getVertexName());
     builder.setVertexParallelism(taskSpec.getVertexParallelism());

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 6abd706..6a38d85 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.ContainerEndReason;
 import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -172,8 +175,15 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId) {
-    super.registerContainerEnd(containerId);
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+    super.registerContainerEnd(containerId, endReason);
+    if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) {
+      LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId);
+      TezTaskAttemptID taskAttemptId = entityTracker.getTaskAttemptIdForContainer(containerId);
+      if (taskAttemptId != null) {
+        sendTaskTerminated(taskAttemptId, true);
+      }
+    }
     entityTracker.unregisterContainer(containerId);
   }
 
@@ -224,7 +234,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
     getTaskCommunicatorContext()
         .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
-    communicator.submitWork(requestProto, host, port,
+    communicator.sendSubmitWork(requestProto, host, port,
         new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
           @Override
           public void setResponse(SubmitWorkResponseProto response) {
@@ -238,7 +248,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
               t = se.getCause();
             }
             if (t instanceof RemoteException) {
-              RemoteException re = (RemoteException)t;
+              RemoteException re = (RemoteException) t;
               String message = re.toString();
               // RejectedExecutions from the remote service treated as KILLED
               if (message.contains(RejectedExecutionException.class.getName())) {
@@ -249,7 +259,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
                     TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
               } else {
                 // All others from the remote service cause the task to FAIL.
-                LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
+                LOG.info(
+                    "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                        containerId, t);
                 getTaskCommunicatorContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.toString());
@@ -264,7 +276,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
               } else {
                 // Anything else is a FAIL.
-                LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
+                LOG.info(
+                    "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                        containerId, t);
                 getTaskCommunicatorContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.getMessage());
@@ -275,14 +289,50 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   }
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
-    super.unregisterRunningTaskAttempt(taskAttemptID);
-    entityTracker.unregisterTaskAttempt(taskAttemptID);
+  public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId,
+                                           TaskAttemptEndReason endReason) {
+    super.unregisterRunningTaskAttempt(taskAttemptId, endReason);
+
+    if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
+      LOG.info("Processing taskEnd for task {} caused by internal preemption", taskAttemptId);
+      sendTaskTerminated(taskAttemptId, false);
+    }
+    entityTracker.unregisterTaskAttempt(taskAttemptId);
     // This will also be invoked for tasks which have been KILLED / rejected by the daemon.
     // Informing the daemon becomes necessary once the LlapScheduler supports preemption
     // and/or starts attempting to kill tasks which may be running on a node.
   }
 
+  private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
+                                  boolean invokedByContainerEnd) {
+    LOG.info(
+        "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
+        taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd");
+    LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId);
+    // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
+    if (nodeId != null) {
+      TerminateFragmentRequestProto request =
+          TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName)
+              .setFragmentIdentifierString(taskAttemptId.toString()).build();
+      communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
+          new TaskCommunicator.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
+            @Override
+            public void setResponse(TerminateFragmentResponseProto response) {
+            }
+
+            @Override
+            public void indicateError(Throwable t) {
+              LOG.warn("Failed to send terminate fragment request for {}",
+                  taskAttemptId.toString());
+            }
+          });
+    } else {
+      LOG.info(
+          "Not sending terminate request for fragment {} since it's node is not known. Already unregistered",
+          taskAttemptId.toString());
+    }
+  }
+
   @Override
   public void dagComplete(final String dagName) {
     QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName(
@@ -410,7 +460,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     public void taskKilled(TezTaskAttemptID taskAttemptId) {
       // TODO Unregister the task for state updates, which could in turn unregister the node.
       getTaskCommunicatorContext().taskKilled(taskAttemptId,
-          TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted");
+          TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
       entityTracker.unregisterTaskAttempt(taskAttemptId);
     }
 
@@ -480,6 +530,42 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
       containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port));
     }
 
+    LlapNodeId getNodeIdForContainer(ContainerId containerId) {
+      return containerToNodeMap.get(containerId);
+    }
+
+    LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) {
+      return attemptToNodeMap.get(taskAttemptId);
+    }
+
+    ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
+      LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId);
+      if (llapNodeId != null) {
+        BiMap<TezTaskAttemptID, ContainerId> bMap = nodeMap.get(llapNodeId).inverse();
+        if (bMap != null) {
+          return bMap.get(taskAttemptId);
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+    }
+
+    TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
+      LlapNodeId llapNodeId = getNodeIdForContainer(containerId);
+      if (llapNodeId != null) {
+        BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
+        if (bMap != null) {
+          return bMap.get(containerId);
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+    }
+
     void unregisterContainer(ContainerId containerId) {
       LlapNodeId llapNodeId = containerToNodeMap.remove(containerId);
       if (llapNodeId == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index cec17f9..d357d61 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
@@ -84,21 +86,10 @@ public class TaskCommunicator extends AbstractService {
     executor.shutdownNow();
   }
 
-  public void submitWork(SubmitWorkRequestProto request, String host, int port,
+  public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port,
                          final ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
     ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(host, port, request));
-    Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() {
-      @Override
-      public void onSuccess(SubmitWorkResponseProto result) {
-        callback.setResponse(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        callback.indicateError(t);
-      }
-    });
-
+    Futures.addCallback(future, new ResponseCallback(callback));
   }
 
   public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host,
@@ -106,17 +97,7 @@ public class TaskCommunicator extends AbstractService {
                                     final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
     ListenableFuture<SourceStateUpdatedResponseProto> future =
         executor.submit(new SendSourceStateUpdateCallable(host, port, request));
-    Futures.addCallback(future, new FutureCallback<SourceStateUpdatedResponseProto>() {
-      @Override
-      public void onSuccess(SourceStateUpdatedResponseProto result) {
-        callback.setResponse(result);
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        callback.indicateError(t);
-      }
-    });
+    Futures.addCallback(future, new ResponseCallback(callback));
   }
 
   public void sendQueryComplete(final QueryCompleteRequestProto request, final String host,
@@ -124,17 +105,34 @@ public class TaskCommunicator extends AbstractService {
                                 final ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
     ListenableFuture<QueryCompleteResponseProto> future =
         executor.submit(new SendQueryCompleteCallable(host, port, request));
-    Futures.addCallback(future, new FutureCallback<QueryCompleteResponseProto>() {
-      @Override
-      public void onSuccess(QueryCompleteResponseProto result) {
-        callback.setResponse(result);
-      }
+    Futures.addCallback(future, new ResponseCallback(callback));
+  }
 
-      @Override
-      public void onFailure(Throwable t) {
-        callback.indicateError(t);
-      }
-    });
+  public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host,
+                                    final int port,
+                                    final ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
+    ListenableFuture<TerminateFragmentResponseProto> future =
+        executor.submit(new SendTerminateFragmentCallable(host, port, request));
+    Futures.addCallback(future, new ResponseCallback(callback));
+  }
+
+  private static class ResponseCallback<TYPE extends Message> implements FutureCallback<TYPE> {
+
+    private final ExecuteRequestCallback<TYPE> callback;
+
+    public ResponseCallback(ExecuteRequestCallback<TYPE> callback) {
+      this.callback = callback;
+    }
+
+    @Override
+    public void onSuccess(TYPE result) {
+      callback.setResponse(result);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      callback.indicateError(t);
+    }
   }
 
   private static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message>
@@ -195,6 +193,20 @@ public class TaskCommunicator extends AbstractService {
     }
   }
 
+  private class SendTerminateFragmentCallable
+      extends CallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> {
+
+    protected SendTerminateFragmentCallable(String hostname, int port,
+                                            TerminateFragmentRequestProto terminateFragmentRequestProto) {
+      super(hostname, port, terminateFragmentRequestProto);
+    }
+
+    @Override
+    public TerminateFragmentResponseProto call() throws Exception {
+      return getProxy(hostname, port).terminateFragment(null, request);
+    }
+  }
+
   public interface ExecuteRequestCallback<T extends Message> {
     void setResponse(T response);
     void indicateError(Throwable t);