You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/12/18 23:00:47 UTC
hive git commit: HIVE-12658: Task rejection by an llap daemon spams
the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by
Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/branch-2.0 a17c95e04 -> 1d5e9c96c
HIVE-12658: Task rejection by an llap daemon spams the log with RejectedExecutionExceptions (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d5e9c96
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d5e9c96
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d5e9c96
Branch: refs/heads/branch-2.0
Commit: 1d5e9c96ced95f2c897f83937fd65cfa06bd312d
Parents: a17c95e
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Fri Dec 18 13:45:02 2015 -0600
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Fri Dec 18 16:00:31 2015 -0600
----------------------------------------------------------------------
.../daemon/rpc/LlapDaemonProtocolProtos.java | 233 +++++++++++++++++--
.../hive/llap/daemon/ContainerRunner.java | 13 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 42 +++-
.../hive/llap/daemon/impl/LlapDaemon.java | 22 +-
.../impl/LlapDaemonProtocolServerImpl.java | 12 +-
.../hadoop/hive/llap/daemon/impl/Scheduler.java | 11 +-
.../llap/daemon/impl/TaskExecutorService.java | 28 ++-
.../llap/tezplugins/LlapTaskCommunicator.java | 41 ++--
.../src/protobuf/LlapDaemonProtocol.proto | 7 +
.../impl/TestLlapDaemonProtocolServerImpl.java | 19 +-
.../daemon/impl/TestTaskExecutorService.java | 31 +--
11 files changed, 355 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index af009b8..d2180e5 100644
--- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -90,6 +90,97 @@ public final class LlapDaemonProtocolProtos {
// @@protoc_insertion_point(enum_scope:SourceStateProto)
}
+ /**
+ * Protobuf enum {@code SubmissionStateProto}
+ */
+ public enum SubmissionStateProto
+ implements com.google.protobuf.ProtocolMessageEnum {
+ /**
+ * <code>ACCEPTED = 1;</code>
+ */
+ ACCEPTED(0, 1),
+ /**
+ * <code>REJECTED = 2;</code>
+ */
+ REJECTED(1, 2),
+ /**
+ * <code>EVICTED_OTHER = 3;</code>
+ */
+ EVICTED_OTHER(2, 3),
+ ;
+
+ /**
+ * <code>ACCEPTED = 1;</code>
+ */
+ public static final int ACCEPTED_VALUE = 1;
+ /**
+ * <code>REJECTED = 2;</code>
+ */
+ public static final int REJECTED_VALUE = 2;
+ /**
+ * <code>EVICTED_OTHER = 3;</code>
+ */
+ public static final int EVICTED_OTHER_VALUE = 3;
+
+
+ public final int getNumber() { return value; }
+
+ public static SubmissionStateProto valueOf(int value) {
+ switch (value) {
+ case 1: return ACCEPTED;
+ case 2: return REJECTED;
+ case 3: return EVICTED_OTHER;
+ default: return null;
+ }
+ }
+
+ public static com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>
+ internalGetValueMap() {
+ return internalValueMap;
+ }
+ private static com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>
+ internalValueMap =
+ new com.google.protobuf.Internal.EnumLiteMap<SubmissionStateProto>() {
+ public SubmissionStateProto findValueByNumber(int number) {
+ return SubmissionStateProto.valueOf(number);
+ }
+ };
+
+ public final com.google.protobuf.Descriptors.EnumValueDescriptor
+ getValueDescriptor() {
+ return getDescriptor().getValues().get(index);
+ }
+ public final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptorForType() {
+ return getDescriptor();
+ }
+ public static final com.google.protobuf.Descriptors.EnumDescriptor
+ getDescriptor() {
+ return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(1);
+ }
+
+ private static final SubmissionStateProto[] VALUES = values();
+
+ public static SubmissionStateProto valueOf(
+ com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
+ if (desc.getType() != getDescriptor()) {
+ throw new java.lang.IllegalArgumentException(
+ "EnumValueDescriptor is not for this type.");
+ }
+ return VALUES[desc.getIndex()];
+ }
+
+ private final int index;
+ private final int value;
+
+ private SubmissionStateProto(int index, int value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ // @@protoc_insertion_point(enum_scope:SubmissionStateProto)
+ }
+
public interface UserPayloadProtoOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -8265,6 +8356,16 @@ public final class LlapDaemonProtocolProtos {
public interface SubmitWorkResponseProtoOrBuilder
extends com.google.protobuf.MessageOrBuilder {
+
+ // optional .SubmissionStateProto submission_state = 1;
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ boolean hasSubmissionState();
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState();
}
/**
* Protobuf type {@code SubmitWorkResponseProto}
@@ -8299,6 +8400,7 @@ public final class LlapDaemonProtocolProtos {
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
+ int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
@@ -8316,6 +8418,17 @@ public final class LlapDaemonProtocolProtos {
}
break;
}
+ case 8: {
+ int rawValue = input.readEnum();
+ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.valueOf(rawValue);
+ if (value == null) {
+ unknownFields.mergeVarintField(1, rawValue);
+ } else {
+ bitField0_ |= 0x00000001;
+ submissionState_ = value;
+ }
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8355,7 +8468,25 @@ public final class LlapDaemonProtocolProtos {
return PARSER;
}
+ private int bitField0_;
+ // optional .SubmissionStateProto submission_state = 1;
+ public static final int SUBMISSION_STATE_FIELD_NUMBER = 1;
+ private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_;
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public boolean hasSubmissionState() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() {
+ return submissionState_;
+ }
+
private void initFields() {
+ submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -8369,6 +8500,9 @@ public final class LlapDaemonProtocolProtos {
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeEnum(1, submissionState_.getNumber());
+ }
getUnknownFields().writeTo(output);
}
@@ -8378,6 +8512,10 @@ public final class LlapDaemonProtocolProtos {
if (size != -1) return size;
size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeEnumSize(1, submissionState_.getNumber());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -8401,6 +8539,11 @@ public final class LlapDaemonProtocolProtos {
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto) obj;
boolean result = true;
+ result = result && (hasSubmissionState() == other.hasSubmissionState());
+ if (hasSubmissionState()) {
+ result = result &&
+ (getSubmissionState() == other.getSubmissionState());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -8414,6 +8557,10 @@ public final class LlapDaemonProtocolProtos {
}
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (hasSubmissionState()) {
+ hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER;
+ hash = (53 * hash) + hashEnum(getSubmissionState());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -8523,6 +8670,8 @@ public final class LlapDaemonProtocolProtos {
public Builder clear() {
super.clear();
+ submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+ bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@@ -8549,6 +8698,13 @@ public final class LlapDaemonProtocolProtos {
public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto buildPartial() {
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.submissionState_ = submissionState_;
+ result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@@ -8564,6 +8720,9 @@ public final class LlapDaemonProtocolProtos {
public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other) {
if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()) return this;
+ if (other.hasSubmissionState()) {
+ setSubmissionState(other.getSubmissionState());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -8589,6 +8748,43 @@ public final class LlapDaemonProtocolProtos {
}
return this;
}
+ private int bitField0_;
+
+ // optional .SubmissionStateProto submission_state = 1;
+ private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public boolean hasSubmissionState() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() {
+ return submissionState_;
+ }
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public Builder setSubmissionState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ submissionState_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional .SubmissionStateProto submission_state = 1;</code>
+ */
+ public Builder clearSubmissionState() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED;
+ onChanged();
+ return this;
+ }
// @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto)
}
@@ -13565,30 +13761,33 @@ public final class LlapDaemonProtocolProtos {
"ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" +
"\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" +
"\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" +
- "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" +
- "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" +
- "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" +
- "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" +
- "pdatedResponseProto\"X\n\031QueryCompleteRequ" +
- "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" +
- " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo",
- "mpleteResponseProto\"g\n\035TerminateFragment" +
- "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" +
- "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" +
- " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" +
- "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" +
- "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" +
- "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" +
+ "ntRuntimeInfo\"J\n\027SubmitWorkResponseProto" +
+ "\022/\n\020submission_state\030\001 \001(\0162\025.SubmissionS" +
+ "tateProto\"f\n\036SourceStateUpdatedRequestPr" +
+ "oto\022\020\n\010dag_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022" +
+ " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" +
+ "rceStateUpdatedResponseProto\"X\n\031QueryCom" +
+ "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010d",
+ "ag_name\030\002 \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034" +
+ "\n\032QueryCompleteResponseProto\"g\n\035Terminat" +
+ "eFragmentRequestProto\022\020\n\010query_id\030\001 \001(\t\022" +
+ "\020\n\010dag_name\030\002 \001(\t\022\"\n\032fragment_identifier" +
+ "_string\030\007 \001(\t\" \n\036TerminateFragmentRespon" +
+ "seProto\"\026\n\024GetTokenRequestProto\"&\n\025GetTo" +
+ "kenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020Sourc" +
+ "eStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNIN" +
+ "G\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTED\020" +
+ "\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022L",
"lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" +
"WorkRequestProto\032\030.SubmitWorkResponsePro" +
- "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp",
+ "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp" +
"datedRequestProto\032 .SourceStateUpdatedRe" +
"sponseProto\022H\n\rqueryComplete\022\032.QueryComp" +
"leteRequestProto\032\033.QueryCompleteResponse" +
"Proto\022T\n\021terminateFragment\022\036.TerminateFr" +
"agmentRequestProto\032\037.TerminateFragmentRe" +
"sponseProto2]\n\026LlapManagementProtocol\022C\n" +
- "\022getDelegationToken\022\025.GetTokenRequestPro" +
+ "\022getDelegationToken\022\025.GetTokenRequestPro",
"to\032\026.GetTokenResponseProtoBH\n&org.apache" +
".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" +
"ProtocolProtos\210\001\001\240\001\001"
@@ -13645,7 +13844,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubmitWorkResponseProto_descriptor,
- new java.lang.String[] { });
+ new java.lang.String[] { "SubmissionState", });
internal_static_SourceStateUpdatedRequestProto_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index f3ce33b..fc29371 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -16,19 +16,22 @@ package org.apache.hadoop.hive.llap.daemon;
import java.io.IOException;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
public interface ContainerRunner {
- void submitWork(SubmitWorkRequestProto request) throws IOException;
+ SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException;
- void sourceStateUpdated(SourceStateUpdatedRequestProto request);
+ SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request);
- void queryComplete(QueryCompleteRequestProto request);
+ QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request);
- void terminateFragment(TerminateFragmentRequestProto request);
+ TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2139bb0..0d85671 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -22,13 +22,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
@@ -39,9 +37,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
@@ -62,7 +65,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-
// TODO Convert this to a CompositeService
public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
@@ -145,7 +147,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
@Override
- public void submitWork(SubmitWorkRequestProto request) throws IOException {
+ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException {
HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(),
localAddress.get().getHostName(), request.getFragmentSpec().getDagName(),
request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(),
@@ -157,6 +159,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
// TODO Reduce the length of this string. Way too verbose at the moment.
String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString();
NDC.push(ndcContextString);
+ Scheduler.SubmissionState submissionState;
+ SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder();
try {
Map<String, String> env = new HashMap<>();
// TODO What else is required in this environment map.
@@ -191,7 +195,9 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
- LOG.debug("Registering request with the ShuffleHandler");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registering request with the ShuffleHandler");
+ }
ShuffleHandler.get()
.registerDag(request.getApplicationIdString(), dagIdentifier, jobToken,
request.getUser(), localDirs);
@@ -200,18 +206,27 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
this);
- try {
- executorService.schedule(callable);
- } catch (RejectedExecutionException e) {
+ submissionState = executorService.schedule(callable);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SubmissionState for {} : {} ", ndcContextString, submissionState);
+ }
+
+ if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) {
// Stop tracking the fragment and re-throw the error.
fragmentComplete(fragmentInfo);
- throw e;
+ return responseBuilder
+ .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()))
+ .build();
}
metrics.incrExecutorTotalRequestsHandled();
metrics.incrExecutorNumQueuedRequests();
} finally {
NDC.pop();
}
+
+ responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name()));
+ return responseBuilder.build();
}
private static class LlapExecutionContext extends ExecutionContextImpl
@@ -230,14 +245,15 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
}
@Override
- public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+ public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request));
queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(),
request.getState());
+ return SourceStateUpdatedResponseProto.getDefaultInstance();
}
@Override
- public void queryComplete(QueryCompleteRequestProto request) {
+ public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
LOG.info("Processing queryComplete notification for {}", request.getDagName());
List<QueryFragmentInfo> knownFragments =
queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay());
@@ -248,12 +264,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
+ return QueryCompleteResponseProto.getDefaultInstance();
}
@Override
- public void terminateFragment(TerminateFragmentRequestProto request) {
+ public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
executorService.killFragment(request.getFragmentIdentifierString());
+ return TerminateFragmentResponseProto.getDefaultInstance();
}
private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e1ecf64..467ab71 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -32,16 +32,20 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.service.CompositeService;
@@ -313,25 +317,25 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
}
@Override
- public void submitWork(SubmitWorkRequestProto request) throws
+ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws
IOException {
numSubmissions.incrementAndGet();
- containerRunner.submitWork(request);
+ return containerRunner.submitWork(request);
}
@Override
- public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
- containerRunner.sourceStateUpdated(request);
+ public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) {
+ return containerRunner.sourceStateUpdated(request);
}
@Override
- public void queryComplete(QueryCompleteRequestProto request) {
- containerRunner.queryComplete(request);
+ public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) {
+ return containerRunner.queryComplete(request);
}
@Override
- public void terminateFragment(TerminateFragmentRequestProto request) {
- containerRunner.terminateFragment(request);
+ public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
+ return containerRunner.terminateFragment(request);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index db0b752..f87fffe 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -93,33 +93,29 @@ public class LlapDaemonProtocolServerImpl extends AbstractService
SubmitWorkRequestProto request) throws
ServiceException {
try {
- containerRunner.submitWork(request);
+ return containerRunner.submitWork(request);
} catch (IOException e) {
throw new ServiceException(e);
}
- return SubmitWorkResponseProto.getDefaultInstance();
}
@Override
public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller,
SourceStateUpdatedRequestProto request) throws ServiceException {
- containerRunner.sourceStateUpdated(request);
- return SourceStateUpdatedResponseProto.getDefaultInstance();
+ return containerRunner.sourceStateUpdated(request);
}
@Override
public QueryCompleteResponseProto queryComplete(RpcController controller,
QueryCompleteRequestProto request) throws ServiceException {
- containerRunner.queryComplete(request);
- return QueryCompleteResponseProto.getDefaultInstance();
+ return containerRunner.queryComplete(request);
}
@Override
public TerminateFragmentResponseProto terminateFragment(
RpcController controller,
TerminateFragmentRequestProto request) throws ServiceException {
- containerRunner.terminateFragment(request);
- return TerminateFragmentResponseProto.getDefaultInstance();
+ return containerRunner.terminateFragment(request);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
index 1d35b10..26c8e55 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java
@@ -18,19 +18,24 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import java.util.Set;
-import java.util.concurrent.RejectedExecutionException;
/**
* Task scheduler interface
*/
public interface Scheduler<T> {
+ enum SubmissionState {
+ ACCEPTED, // request accepted
+ REJECTED, // request rejected as wait queue is full
+ EVICTED_OTHER; // request accepted but evicted other low priority task
+ }
+
/**
* Schedule the task or throw RejectedExecutionException if queues are full
* @param t - task to schedule
- * @throws RejectedExecutionException
+ * @return SubmissionState
*/
- void schedule(T t) throws RejectedExecutionException;
+ SubmissionState schedule(T t);
/**
* Attempt to kill the fragment with the specified fragmentId
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5e2c6dd..34aa5c9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -70,7 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* run to completion immediately (canFinish = false) are added to pre-emption queue.
* <p/>
* When all the executor threads are occupied and wait queue is full, the task scheduler will
- * throw RejectedExecutionException.
+ * return SubmissionState.REJECTED response
* <p/>
* Task executor service can be shut down which will terminated all running tasks and reject all
* new tasks. Shutting down of the task executor service can be done gracefully or immediately.
@@ -316,9 +316,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
@Override
- public void schedule(TaskRunnerCallable task) throws RejectedExecutionException {
+ public SubmissionState schedule(TaskRunnerCallable task) {
TaskWrapper taskWrapper = new TaskWrapper(task, this);
-
+ SubmissionState result;
TaskWrapper evictedTask;
synchronized (lock) {
// If the queue does not have capacity, it does not throw a Rejection. Instead it will
@@ -328,19 +328,35 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
// The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
evictedTask = waitQueue.offer(taskWrapper);
- if (evictedTask != taskWrapper) {
+
+ // null evicted task means offer accepted
+ // evictedTask is not equal taskWrapper means current task is accepted and it evicted
+ // some other task
+ if (evictedTask == null || evictedTask != taskWrapper) {
knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
taskWrapper.setIsInWaitQueue(true);
if (isDebugEnabled) {
LOG.debug("{} added to wait queue. Current wait queue size={}", task.getRequestId(),
waitQueue.size());
}
+
+ result = evictedTask == null ? SubmissionState.ACCEPTED : SubmissionState.EVICTED_OTHER;
+
+ if (isDebugEnabled && evictedTask != null) {
+ LOG.debug("Eviction: {} {} {}", taskWrapper, result, evictedTask);
+ }
} else {
if (isInfoEnabled) {
LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId());
}
evictedTask.getTaskRunnerCallable().killTask();
- throw new RejectedExecutionException("Wait queue full");
+
+ result = SubmissionState.REJECTED;
+
+ if (isDebugEnabled) {
+ LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result);
+ }
+ return result;
}
}
@@ -371,6 +387,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
synchronized (lock) {
lock.notify();
}
+
+ return result;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index ce248e9..9d47940 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,7 +34,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -259,6 +257,21 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@Override
public void setResponse(SubmitWorkResponseProto response) {
+ if (response.hasSubmissionState()) {
+ LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState();
+ if (ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+ LOG.info(
+ "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+ containerId + ", Service Busy");
+ getContext().taskKilled(taskSpec.getTaskAttemptID(),
+ TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
+ return;
+ }
+ } else {
+ // TODO: Provide support for reporting errors
+ // This should never happen as server always returns a valid status on success
+ throw new RuntimeException("SubmissionState in response is expected!");
+ }
LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
}
@@ -270,23 +283,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
if (t instanceof RemoteException) {
RemoteException re = (RemoteException) t;
- String message = re.toString();
- // RejectedExecutions from the remote service treated as KILLED
- if (message.contains(RejectedExecutionException.class.getName())) {
- LOG.info(
- "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
- containerId + ", Service Busy");
- getContext().taskKilled(taskSpec.getTaskAttemptID(),
- TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
- } else {
- // All others from the remote service cause the task to FAIL.
- LOG.info(
- "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
- containerId, t);
- getContext()
- .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
- t.toString());
- }
+ // All others from the remote service cause the task to FAIL.
+ LOG.info(
+ "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
+ containerId, t);
+ getContext()
+ .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
+ t.toString());
} else {
// Exception from the RPC layer - communication failure, consider as KILLED / service down.
if (t instanceof IOException) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index 07721df..a2d944f 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -87,7 +87,14 @@ message SubmitWorkRequestProto {
optional FragmentRuntimeInfo fragment_runtime_info = 10;
}
+enum SubmissionStateProto {
+ ACCEPTED = 1;
+ REJECTED = 2;
+ EVICTED_OTHER = 3;
+}
+
message SubmitWorkResponseProto {
+ optional SubmissionStateProto submission_state = 1;
}
message SourceStateUpdatedRequestProto {
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index 0006a9a..44c958d 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -14,8 +14,10 @@
package org.apache.hadoop.hive.llap.daemon.impl;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
@@ -27,21 +29,28 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto;
import org.junit.Test;
public class TestLlapDaemonProtocolServerImpl {
@Test(timeout = 10000)
- public void test() throws ServiceException {
+ public void test() throws ServiceException, IOException {
LlapConfiguration daemonConf = new LlapConfiguration();
int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);
int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
+ ContainerRunner containerRunnerMock = mock(ContainerRunner.class);
LlapDaemonProtocolServerImpl server =
- new LlapDaemonProtocolServerImpl(numHandlers, mock(ContainerRunner.class),
+ new LlapDaemonProtocolServerImpl(numHandlers, containerRunnerMock,
new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(),
rpcPort, rpcPort + 1);
-
+ when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(
+ SubmitWorkResponseProto
+ .newBuilder()
+ .setSubmissionState(SubmissionStateProto.ACCEPTED)
+ .build());
try {
server.init(new Configuration());
server.start();
@@ -50,10 +59,12 @@ public class TestLlapDaemonProtocolServerImpl {
LlapDaemonProtocolBlockingPB client =
new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(),
serverAddr.getPort(), null, null);
- client.submitWork(null,
+ SubmitWorkResponseProto responseProto = client.submitWork(null,
SubmitWorkRequestProto.newBuilder()
.setAmHost("amhost")
.setAmPort(2000).build());
+ assertEquals(responseProto.getSubmissionState().name(),
+ SubmissionStateProto.ACCEPTED.name());
} finally {
server.stop();
http://git-wip-us.apache.org/repos/asf/hive/blob/1d5e9c96/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index cb2d0e9..5491064 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr
import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -124,28 +122,17 @@ public class TestTaskExecutorService {
// TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
// This currently serves to allow the task to be removed from the waitQueue.
r1.awaitStart();
- try {
- taskExecutorService.schedule(r2);
- } catch (RejectedExecutionException e) {
- fail("Unexpected rejection with space available in queue");
- }
- try {
- taskExecutorService.schedule(r3);
- } catch (RejectedExecutionException e) {
- fail("Unexpected rejection with space available in queue");
- }
+ Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
- try {
- taskExecutorService.schedule(r4);
- fail("Expecting a Rejection for non finishable task with a full queue");
- } catch (RejectedExecutionException e) {
- }
+ submissionState = taskExecutorService.schedule(r3);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
- try {
- taskExecutorService.schedule(r5);
- } catch (RejectedExecutionException e) {
- fail("Unexpected rejection for a finishable task");
- }
+ submissionState = taskExecutorService.schedule(r4);
+ assertEquals(Scheduler.SubmissionState.REJECTED, submissionState);
+
+ submissionState = taskExecutorService.schedule(r5);
+ assertEquals(Scheduler.SubmissionState.EVICTED_OTHER, submissionState);
// Ensure the correct task was preempted.
assertEquals(true, r3.wasPreempted());