You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/12/18 23:00:47 UTC

hive git commit: HIVE-12658: Task rejection by an llap daemon spams the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/branch-2.0 a17c95e04 -> 1d5e9c96c


HIVE-12658: Task rejection by an llap daemon spams the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d5e9c96
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d5e9c96
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d5e9c96

Branch: refs/heads/branch-2.0
Commit: 1d5e9c96ced95f2c897f83937fd65cfa06bd312d
Parents: a17c95e
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Fri Dec 18 13:45:02 2015 -0600
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Fri Dec 18 16:00:31 2015 -0600

----------------------------------------------------------------------
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 233 +++++++++++++++++--
 .../hive/llap/daemon/ContainerRunner.java       |  13 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  42 +++-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  22 +-
 .../impl/LlapDaemonProtocolServerImpl.java      |  12 +-
 .../hadoop/hive/llap/daemon/impl/Scheduler.java |  11 +-
 .../llap/daemon/impl/TaskExecutorService.java   |  28 ++-
 .../llap/tezplugins/LlapTaskCommunicator.java   |  41 ++--
 .../src/protobuf/LlapDaemonProtocol.proto       |   7 +
 .../impl/TestLlapDaemonProtocolServerImpl.java  |  19 +-
 .../daemon/impl/TestTaskExecutorService.java    |  31 +--
 11 files changed, 355 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index af009b8..d2180e5 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -90,6 +90,97 @@ public final class LlapDaemonProtocolProtos {
     // @@protoc_insertion_point(enum_scope:SourceStateProto)
   }
 
+  /**
+   * Protobuf enum {@code SubmissionStateProto}
+   */
+  public enum SubmissionStateProto
+      implements com.google.protobuf.ProtocolMessageEnum {
+    /**
+     * <code>ACCEPTED = 1;</code>
+     */
+    ACCEPTED(0, 1),
+    /**
+     * <code>REJECTED = 2;</code>
+     */
+    REJECTED(1, 2),
+    /**
+     * <code>EVICTED_OTHER = 3;</code>
+     */
+    EVICTED_OTHER(2, 3),
+    ;
+
+    /**
+     * <code>ACCEPTED = 1;</code>
+     */
+    public static final int ACCEPTED_VALUE = 1;
+    /**
+     * <code>REJECTED = 2;</code>
+     */
+    public static final int REJECTED_VALUE = 2;
+    /**
+     * <code>EVICTED_OTHER = 3;</code>
+     */
+    public static final int EVICTED_OTHER_VALUE = 3;
+
+
+    public final int getNumber() { return value; }
+
+    public static SubmissionStateProto valueOf(int value) {
+      switch (value) {
+        case 1: return ACCEPTED;
+        case 2: return REJECTED;
+        case 3: return EVICTED_OTHER;
+        default: return null;
+      }
+    }
+
+    public static com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>
+        internalGetValueMap() {
+      return internalValueMap;
+    }
+    private static com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>
+        internalValueMap =
+          new com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>() {
+            public SubmissionStateProto findValueByNumber(int number) {
+              return SubmissionStateProto.valueOf(number);
+            }
+          };
+
+    public final com.google.protobuf.Descriptors.EnumValueDescriptor
+        getValueDescriptor() {
+      return getDescriptor().getValues().get(index);
+    }
+    public final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptorForType() {
+      return getDescriptor();
+    }
+    public static final com.google.protobuf.Descriptors.EnumDescriptor
+        getDescriptor() {
+      return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(1);
+    }
+
+    private static final SubmissionStateProto[] VALUES = values();
+
+    public static SubmissionStateProto valueOf(
+        com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+      if (desc.getType() != getDescriptor()) {
+        throw new java.lang.IllegalArgumentException(
+          "EnumValueDescriptor is not for this type.");
+      }
+      return VALUES[desc.getIndex()];
+    }
+
+    private final int index;
+    private final int value;
+
+    private SubmissionStateProto(int index, int value) {
+      this.index = index;
+      this.value = value;
+    }
+
+    // @@protoc_insertion_point(enum_scope:SubmissionStateProto)
+  }
+
   public interface UserPayloadProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -8265,6 +8356,16 @@ public final class LlapDaemonProtocolProtos {
 
   public interface SubmitWorkResponseProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // optional .SubmissionStateProto submission_state = 1;
+    /**
+     * <code>optional .SubmissionStateProto submission_state = 1;</code>
+     */
+    boolean hasSubmissionState();
+    /**
+     * <code>optional .SubmissionStateProto submission_state = 1;</code>
+     */
+    org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState();
   }
   /**
    * Protobuf type {@code SubmitWorkResponseProto}
@@ -8299,6 +8400,7 @@ public final class LlapDaemonProtocolProtos {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -8316,6 +8418,17 @@ public final class LlapDaemonProtocolProtos {
               }
               break;
             }
+            case 8: {
+              int rawValue = input.readEnum();
+              org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.valueOf(rawValue);
+              if (value == null) {
+                unknownFields.mergeVarintField(1, rawValue);
+              } else {
+                bitField0_ |= 0x00000001;
+                submissionState_ = value;
+              }
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8355,7 +8468,25 @@ public final class LlapDaemonProtocolProtos {
       return PARSER;
     }
 
+    private int bitField0_;
+    // optional .SubmissionStateProto submission_state = 1;
+    public static final int SUBMISSION_STATE_FIELD_NUMBER = 1;
+    private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_;
+    /**
+     * <code>optional .SubmissionStateProto submission_state = 1;</code>
+     */
+    public boolean hasSubmissionState() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional .SubmissionStateProto submission_state = 1;</code>
+     */
+    public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() {
+      return submissionState_;
+    }
+
     private void initFields() {
+      submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8369,6 +8500,9 @@ public final class LlapDaemonProtocolProtos {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeEnum(1, submissionState_.getNumber());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -8378,6 +8512,10 @@ public final class LlapDaemonProtocolProtos {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(1, submissionState_.getNumber());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -8401,6 +8539,11 @@ public final class LlapDaemonProtocolProtos {
       org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto) obj;
 
       boolean result = true;
+      result = result && (hasSubmissionState() == other.hasSubmissionState());
+      if (hasSubmissionState()) {
+        result = result &&
+            (getSubmissionState() == other.getSubmissionState());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8414,6 +8557,10 @@ public final class LlapDaemonProtocolProtos {
       }
       int hash = 41;
       hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasSubmissionState()) {
+        hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER;
+        hash = (53 * hash) + hashEnum(getSubmissionState());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -8523,6 +8670,8 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder clear() {
         super.clear();
+        submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -8549,6 +8698,13 @@ public final class LlapDaemonProtocolProtos {
 
       public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto buildPartial() {
         org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.submissionState_ = submissionState_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -8564,6 +8720,9 @@ public final class LlapDaemonProtocolProtos {
 
       public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other) {
         if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()) return this;
+        if (other.hasSubmissionState()) {
+          setSubmissionState(other.getSubmissionState());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -8589,6 +8748,43 @@ public final class LlapDaemonProtocolProtos {
         }
         return this;
       }
+      private int bitField0_;
+
+      // optional .SubmissionStateProto submission_state = 1;
+      private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+      /**
+       * <code>optional .SubmissionStateProto submission_state = 1;</code>
+       */
+      public boolean hasSubmissionState() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional .SubmissionStateProto submission_state = 1;</code>
+       */
+      public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() {
+        return submissionState_;
+      }
+      /**
+       * <code>optional .SubmissionStateProto submission_state = 1;</code>
+       */
+      public Builder setSubmissionState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000001;
+        submissionState_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional .SubmissionStateProto submission_state = 1;</code>
+       */
+      public Builder clearSubmissionState() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto)
     }
@@ -13565,30 +13761,33 @@ public final class LlapDaemonProtocolProtos {
       "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" +
       "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" +
       "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" +
-      "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" +
-      "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" +
-      "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" +
-      "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" +
-      "pdatedResponseProto\"X\n\031QueryCompleteRequ" +
-      "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" +
-      " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo",
-      "mpleteResponseProto\"g\n\035TerminateFragment" +
-      "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" +
-      "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" +
-      " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" +
-      "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" +
-      "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" +
-      "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" +
+      "ntRuntimeInfo\"J\n\027SubmitWorkResponseProto" +
+      "\022/\n\020submission_state\030\001 \001(\0162\025.SubmissionS" +
+      "tateProto\"f\n\036SourceStateUpdatedRequestPr" +
+      "oto\022\020\n\010dag_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022" +
+      " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" +
+      "rceStateUpdatedResponseProto\"X\n\031QueryCom" +
+      "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010d",
+      "ag_name\030\002 \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034" +
+      "\n\032QueryCompleteResponseProto\"g\n\035Terminat" +
+      "eFragmentRequestProto\022\020\n\010query_id\030\001 \001(\t\022" +
+      "\020\n\010dag_name\030\002 \001(\t\022\"\n\032fragment_identifier" +
+      "_string\030\007 \001(\t\" \n\036TerminateFragmentRespon" +
+      "seProto\"\026\n\024GetTokenRequestProto\"&\n\025GetTo" +
+      "kenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020Sourc" +
+      "eStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNIN" +
+      "G\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTED\020" +
+      "\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022L",
       "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" +
       "WorkRequestProto\032\030.SubmitWorkResponsePro" +
-      "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp",
+      "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp" +
       "datedRequestProto\032 .SourceStateUpdatedRe" +
       "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" +
       "leteRequestProto\032\033.QueryCompleteResponse" +
       "Proto\022T\n\021terminateFragment\022\036.TerminateFr" +
       "agmentRequestProto\032\037.TerminateFragmentRe" +
       "sponseProto2]\n\026LlapManagementProtocol\022C\n" +
-      "\022getDelegationToken\022\025.GetTokenRequestPro" +
+      "\022getDelegationToken\022\025.GetTokenRequestPro",
       "to\032\026.GetTokenResponseProtoBH\n&org.apache" +
       ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" +
       "ProtocolProtos\210\001\001\240\001\001"
@@ -13645,7 +13844,7 @@ public final class LlapDaemonProtocolProtos {
           internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_SubmitWorkResponseProto_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "SubmissionState", });
           internal_static_SourceStateUpdatedRequestProto_descriptor =
             getDescriptor().getMessageTypes().get(8);
           internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index f3ce33b..fc29371 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -16,19 +16,22 @@ package org.apache.hadoop.hive.llap.daemon;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 
 public interface ContainerRunner {
 
-  void submitWork(SubmitWorkRequestProto request) throws IOException;
+  SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException;
 
-  void sourceStateUpdated(SourceStateUpdatedRequestProto request);
+  SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request);
 
-  void queryComplete(QueryCompleteRequestProto request);
+  QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request);
 
-  void terminateFragment(TerminateFragmentRequestProto request);
+  TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2139bb0..0d85671 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -22,13 +22,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -39,9 +37,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
@@ -62,7 +65,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-
 // TODO Convert this to a CompositeService
 public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
@@ -145,7 +147,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   }
 
   @Override
-  public void submitWork(SubmitWorkRequestProto request) throws IOException {
+  public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
     HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
         localAddress.get().getHostName(), request.getFragmentSpec().getDagName(),
         request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
@@ -157,6 +159,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     // TODO Reduce the length of this string. Way too verbose at the moment.
     String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString();
     NDC.push(ndcContextString);
+    Scheduler.SubmissionState submissionState;
+    SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder();
     try {
       Map<String, String> env = new HashMap<>();
       // TODO What else is required in this environment map.
@@ -191,7 +195,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
       Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
-      LOG.debug("Registering request with the ShuffleHandler");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registering request with the ShuffleHandler");
+      }
       ShuffleHandler.get()
           .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
               request.getUser(), localDirs);
@@ -200,18 +206,27 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
           this);
-      try {
-        executorService.schedule(callable);
-      } catch (RejectedExecutionException e) {
+      submissionState = executorService.schedule(callable);
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("SubmissionState for {} : {} ", ndcContextString, submissionState);
+      }
+
+      if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) {
         // Stop tracking the fragment and re-throw the error.
         fragmentComplete(fragmentInfo);
-        throw e;
+        return responseBuilder
+            .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()))
+            .build();
       }
       metrics.incrExecutorTotalRequestsHandled();
       metrics.incrExecutorNumQueuedRequests();
     } finally {
       NDC.pop();
     }
+
+    responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));
+    return responseBuilder.build();
   }
 
   private static class LlapExecutionContext extends ExecutionContextImpl
@@ -230,14 +245,15 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   }
 
   @Override
-  public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+  public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
     LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
     queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
         request.getState());
+    return SourceStateUpdatedResponseProto.getDefaultInstance();
   }
 
   @Override
-  public void queryComplete(QueryCompleteRequestProto request) {
+  public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
     LOG.info("Processing queryComplete notification for {}", request.getDagName());
     List<QueryFragmentInfo> knownFragments =
         queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
@@ -248,12 +264,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           fragmentInfo.getFragmentIdentifierString());
       executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
     }
+    return QueryCompleteResponseProto.getDefaultInstance();
   }
 
   @Override
-  public void terminateFragment(TerminateFragmentRequestProto request) {
+  public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
     LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
     executorService.killFragment(request.getFragmentIdentifierString());
+    return TerminateFragmentResponseProto.getDefaultInstance();
   }
 
   private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e1ecf64..467ab71 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -32,16 +32,20 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.service.CompositeService;
@@ -313,25 +317,25 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   }
 
   @Override
-  public void submitWork(SubmitWorkRequestProto request) throws
+  public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
       IOException {
     numSubmissions.incrementAndGet();
-    containerRunner.submitWork(request);
+    return containerRunner.submitWork(request);
   }
 
   @Override
-  public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
-    containerRunner.sourceStateUpdated(request);
+  public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+    return containerRunner.sourceStateUpdated(request);
   }
 
   @Override
-  public void queryComplete(QueryCompleteRequestProto request) {
-    containerRunner.queryComplete(request);
+  public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+    return containerRunner.queryComplete(request);
   }
 
   @Override
-  public void terminateFragment(TerminateFragmentRequestProto request) {
-    containerRunner.terminateFragment(request);
+  public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
+    return containerRunner.terminateFragment(request);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index db0b752..f87fffe 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -93,33 +93,29 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
                                             SubmitWorkRequestProto request) throws
       ServiceException {
     try {
-      containerRunner.submitWork(request);
+      return containerRunner.submitWork(request);
     } catch (IOException e) {
       throw new ServiceException(e);
     }
-    return SubmitWorkResponseProto.getDefaultInstance();
   }
 
   @Override
   public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
       SourceStateUpdatedRequestProto request) throws ServiceException {
-    containerRunner.sourceStateUpdated(request);
-    return SourceStateUpdatedResponseProto.getDefaultInstance();
+    return containerRunner.sourceStateUpdated(request);
   }
 
   @Override
   public QueryCompleteResponseProto queryComplete(RpcController controller,
       QueryCompleteRequestProto request) throws ServiceException {
-    containerRunner.queryComplete(request);
-    return QueryCompleteResponseProto.getDefaultInstance();
+    return containerRunner.queryComplete(request);
   }
 
   @Override
   public TerminateFragmentResponseProto terminateFragment(
       RpcController controller,
       TerminateFragmentRequestProto request) throws ServiceException {
-    containerRunner.terminateFragment(request);
-    return TerminateFragmentResponseProto.getDefaultInstance();
+    return containerRunner.terminateFragment(request);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index 1d35b10..26c8e55 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -18,19 +18,24 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
 
 /**
  * Task scheduler interface
  */
 public interface Scheduler<T> {
 
+  enum SubmissionState {
+    ACCEPTED, // request accepted
+    REJECTED, // request rejected as wait queue is full
+    EVICTED_OTHER; // request accepted but evicted other low priority task
+  }
+
   /**
    * Schedule the task or throw RejectedExecutionException if queues are full
    * @param t - task to schedule
-   * @throws RejectedExecutionException
+   * @return SubmissionState
    */
-  void schedule(T t) throws RejectedExecutionException;
+  SubmissionState schedule(T t);
 
   /**
    * Attempt to kill the fragment with the specified fragmentId

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5e2c6dd..34aa5c9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -70,7 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * run to completion immediately (canFinish = false) are added to pre-emption queue.
  * <p/>
  * When all the executor threads are occupied and wait queue is full, the task scheduler will
- * throw RejectedExecutionException.
+ * return SubmissionState.REJECTED response
  * <p/>
  * Task executor service can be shut down which will terminated all running tasks and reject all
  * new tasks. Shutting down of the task executor service can be done gracefully or immediately.
@@ -316,9 +316,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
   }
 
   @Override
-  public void schedule(TaskRunnerCallable task) throws RejectedExecutionException {
+  public SubmissionState schedule(TaskRunnerCallable task) {
     TaskWrapper taskWrapper = new TaskWrapper(task, this);
-
+    SubmissionState result;
     TaskWrapper evictedTask;
     synchronized (lock) {
       // If the queue does not have capacity, it does not throw a Rejection. Instead it will
@@ -328,19 +328,35 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
       // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
       // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
       evictedTask = waitQueue.offer(taskWrapper);
-      if (evictedTask != taskWrapper) {
+
+      // null evicted task means offer accepted
+      // evictedTask is not equal taskWrapper means current task is accepted and it evicted
+      // some other task
+      if (evictedTask == null || evictedTask != taskWrapper) {
         knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
         taskWrapper.setIsInWaitQueue(true);
         if (isDebugEnabled) {
           LOG.debug("{} added to wait queue. Current wait queue size={}", task.getRequestId(),
               waitQueue.size());
         }
+
+        result = evictedTask == null ? SubmissionState.ACCEPTED : SubmissionState.EVICTED_OTHER;
+
+        if (isDebugEnabled && evictedTask != null) {
+          LOG.debug("Eviction: {} {} {}", taskWrapper, result, evictedTask);
+        }
       } else {
         if (isInfoEnabled) {
           LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId());
         }
         evictedTask.getTaskRunnerCallable().killTask();
-        throw new RejectedExecutionException("Wait queue full");
+
+        result = SubmissionState.REJECTED;
+
+        if (isDebugEnabled) {
+          LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result);
+        }
+        return result;
       }
     }
 
@@ -371,6 +387,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     synchronized (lock) {
       lock.notify();
     }
+
+    return result;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index ce248e9..9d47940 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -259,6 +257,21 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
         new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
           @Override
           public void setResponse(SubmitWorkResponseProto response) {
+            if (response.hasSubmissionState()) {
+              LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState();
+              if (ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+                LOG.info(
+                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                        containerId + ", Service Busy");
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
+                    TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
+                return;
+              }
+            } else {
+              // TODO: Provide support for reporting errors
+              // This should never happen as server always returns a valid status on success
+              throw new RuntimeException("SubmissionState in response is expected!");
+            }
             LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
           }
 
@@ -270,23 +283,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
             }
             if (t instanceof RemoteException) {
               RemoteException re = (RemoteException) t;
-              String message = re.toString();
-              // RejectedExecutions from the remote service treated as KILLED
-              if (message.contains(RejectedExecutionException.class.getName())) {
-                LOG.info(
-                    "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
-                        containerId + ", Service Busy");
-                getContext().taskKilled(taskSpec.getTaskAttemptID(),
-                    TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
-              } else {
-                // All others from the remote service cause the task to FAIL.
-                LOG.info(
-                    "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
-                        containerId, t);
-                getContext()
-                    .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
-                        t.toString());
-              }
+              // All others from the remote service cause the task to FAIL.
+              LOG.info(
+                  "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+                      containerId, t);
+              getContext()
+                  .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+                      t.toString());
             } else {
               // Exception from the RPC layer - communication failure, consider as KILLED / service down.
               if (t instanceof IOException) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index 07721df..a2d944f 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -87,7 +87,14 @@ message SubmitWorkRequestProto {
   optional FragmentRuntimeInfo fragment_runtime_info = 10;
 }
 
+enum SubmissionStateProto {
+  ACCEPTED = 1;
+  REJECTED = 2;
+  EVICTED_OTHER = 3;
+}
+
 message SubmitWorkResponseProto {
+  optional SubmissionStateProto submission_state = 1;
 }
 
 message SourceStateUpdatedRequestProto {

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 0006a9a..44c958d 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -14,8 +14,10 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -27,21 +29,28 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
 import org.junit.Test;
 
 public class TestLlapDaemonProtocolServerImpl {
 
 
   @Test(timeout = 10000)
-  public void test() throws ServiceException {
+  public void test() throws ServiceException, IOException {
     LlapConfiguration daemonConf = new LlapConfiguration();
     int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
     int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
+    ContainerRunner containerRunnerMock = mock(ContainerRunner.class);
     LlapDaemonProtocolServerImpl server =
-        new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
+        new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock,
            new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
            rpcPort, rpcPort + 1);
-
+    when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
+        SubmitWorkResponseProto
+            .newBuilder()
+            .setSubmissionState(SubmissionStateProto.ACCEPTED)
+            .build());
     try {
       server.init(new Configuration());
       server.start();
@@ -50,10 +59,12 @@ public class TestLlapDaemonProtocolServerImpl {
       LlapDaemonProtocolBlockingPB client =
           new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(),
               serverAddr.getPort(), null, null);
-      client.submitWork(null,
+      SubmitWorkResponseProto responseProto = client.submitWork(null,
           SubmitWorkRequestProto.newBuilder()
               .setAmHost("amhost")
               .setAmPort(2000).build());
+      assertEquals(responseProto.getSubmissionState().name(),
+          SubmissionStateProto.ACCEPTED.name());
 
     } finally {
       server.stop();

http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index cb2d0e9..5491064 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr
 import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -124,28 +122,17 @@ public class TestTaskExecutorService {
       // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
       // This currently serves to allow the task to be removed from the waitQueue.
       r1.awaitStart();
-      try {
-        taskExecutorService.schedule(r2);
-      } catch (RejectedExecutionException e) {
-        fail("Unexpected rejection with space available in queue");
-      }
-      try {
-        taskExecutorService.schedule(r3);
-      } catch (RejectedExecutionException e) {
-        fail("Unexpected rejection with space available in queue");
-      }
+      Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
 
-      try {
-        taskExecutorService.schedule(r4);
-        fail("Expecting a Rejection for non finishable task with a full queue");
-      } catch (RejectedExecutionException e) {
-      }
+      submissionState = taskExecutorService.schedule(r3);
+      assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
 
-      try {
-        taskExecutorService.schedule(r5);
-      } catch (RejectedExecutionException e) {
-        fail("Unexpected rejection for a finishable task");
-      }
+      submissionState = taskExecutorService.schedule(r4);
+      assertEquals(Scheduler.SubmissionState.REJECTED, submissionState);
+
+      submissionState = taskExecutorService.schedule(r5);
+        assertEquals(Scheduler.SubmissionState.EVICTED_OTHER, submissionState);
 
       // Ensure the correct task was preempted.
       assertEquals(true, r3.wasPreempted());