You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/21 00:57:52 UTC
[2/2] hive git commit: HIVE-11612. Allow wait queue comparator to be
specified as a classname. (Siddharth Seth)
HIVE-11612. Allow wait queue comparator to be specified as a classname. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d28b6a53
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d28b6a53
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d28b6a53
Branch: refs/heads/llap
Commit: d28b6a53e0d3091452405750d28fbb687b255fbe
Parents: b18db4f
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 20 15:57:28 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 15:57:28 2015 -0700
----------------------------------------------------------------------
.../llap/configuration/LlapConfiguration.java | 7 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 7 +-
.../llap/daemon/impl/TaskExecutorService.java | 144 ++-----
.../llap/daemon/impl/TaskRunnerCallable.java | 4 +-
.../comparator/FirstInFirstOutComparator.java | 81 ++++
.../comparator/ShortestJobFirstComparator.java | 70 ++++
.../daemon/impl/TaskExecutorTestHelpers.java | 238 +++++++++++
.../daemon/impl/TestTaskExecutorService.java | 404 +------------------
.../daemon/impl/TestTaskExecutorService2.java | 324 ---------------
.../TestFirstInFirstOutComparator.java | 321 +++++++++++++++
.../TestShortestJobFirstComparator.java | 199 +++++++++
11 files changed, 958 insertions(+), 841 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index b6633b8..0c90fe8 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -140,9 +140,10 @@ public class LlapConfiguration extends Configuration {
LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size";
public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10;
- public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING =
- LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering";
- public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false;
+ public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME =
+ LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name";
+ public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT =
+ "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator";
public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION =
LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption";
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 710c593..411d965 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -96,9 +96,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.queryTracker = new QueryTracker(conf, localDirsBase);
addIfService(queryTracker);
- boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING,
- LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT);
- this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering,
+ String waitQueueSchedulerClassName =
+ conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME,
+ LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT);
+ this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName,
enablePreemption);
AuxiliaryServiceHelper.setServiceDataIntoEnv(
TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index f99c05d..badeb63 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.Date;
@@ -103,14 +105,32 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
private final Object lock = new Object();
- public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+ public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
boolean enablePreemption) {
super(TaskExecutorService.class.getSimpleName());
+ LOG.info("TaskExecutorService is being setup with parameters: "
+ + "numExecutors=" + numExecutors
+ + ", waitQueueSize=" + waitQueueSize
+ + ", waitQueueComparatorClassName=" + waitQueueComparatorClassName
+ + ", enablePreemption=" + enablePreemption);
+
final Comparator<TaskWrapper> waitQueueComparator;
- if (useFairOrdering) {
- waitQueueComparator = new FirstInFirstOutComparator();
- } else {
- waitQueueComparator = new ShortestJobFirstComparator();
+ try {
+ Class<? extends Comparator> waitQueueComparatorClazz =
+ (Class<? extends Comparator>) Class.forName(
+ waitQueueComparatorClassName);
+ Constructor<? extends Comparator> ctor = waitQueueComparatorClazz.getConstructor(null);
+ waitQueueComparator = ctor.newInstance(null);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" +
+ waitQueueComparatorClassName, e);
+ } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(
+ "Failed to find instantiate wait queue comparator, class=" + waitQueueComparatorClassName,
+ e);
}
this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
@@ -137,10 +157,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback());
- LOG.info("TaskExecutorService started with parameters: "
- + "numExecutors=" + numExecutors
- + ", waitQueueSize=" + waitQueueSize
- + ", enablePreemption=" + enablePreemption);
+
}
@Override
@@ -577,116 +594,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
- // if map tasks and reduce tasks are in finishable state then priority is given to the task
- // that has less number of pending tasks (shortest job)
- @VisibleForTesting
- public static class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
-
- @Override
- public int compare(TaskWrapper t1, TaskWrapper t2) {
- TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
- TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
- boolean o1CanFinish = o1.canFinish();
- boolean o2CanFinish = o2.canFinish();
- if (o1CanFinish == true && o2CanFinish == false) {
- return -1;
- } else if (o1CanFinish == false && o2CanFinish == true) {
- return 1;
- }
-
- FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
- FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
-
- // Check if these belong to the same task, and work with withinDagPriority
- if (o1.getQueryId().equals(o2.getQueryId())) {
- // Same Query
- // Within dag priority - lower values indicate higher priority.
- if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
- return -1;
- } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
- return 1;
- }
- }
-
- // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
- // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
- int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
- int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
- if (knownPending1 < knownPending2) {
- return -1;
- } else if (knownPending1 > knownPending2) {
- return 1;
- }
-
- if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
- return -1;
- } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
- return 1;
- }
- return 0;
- }
- }
-
- // if map tasks and reduce tasks are in finishable state then priority is given to the task in
- // the following order
- // 1) Dag start time
- // 2) Within dag priority
- // 3) Attempt start time
- // 4) Vertex parallelism
- @VisibleForTesting
- public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
-
- @Override
- public int compare(TaskWrapper t1, TaskWrapper t2) {
- TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
- TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
- boolean o1CanFinish = o1.canFinish();
- boolean o2CanFinish = o2.canFinish();
- if (o1CanFinish == true && o2CanFinish == false) {
- return -1;
- } else if (o1CanFinish == false && o2CanFinish == true) {
- return 1;
- }
-
- FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
- FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
-
- if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
- return -1;
- } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
- return 1;
- }
-
- // Check if these belong to the same task, and work with withinDagPriority
- if (o1.getQueryId().equals(o2.getQueryId())) {
- // Same Query
- // Within dag priority - lower values indicate higher priority.
- if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
- return -1;
- } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
- return 1;
- }
- }
-
- if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
- return -1;
- } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
- return 1;
- }
- // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
- // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
- int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
- int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
- if (knownPending1 < knownPending2) {
- return -1;
- } else if (knownPending1 > knownPending2) {
- return 1;
- }
-
- return 0;
- }
- }
@VisibleForTesting
public static class PreemptionQueueComparator implements Comparator<TaskWrapper> {
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 6ceb2e5..e0bd48a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.CallableWithNdc;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -105,7 +106,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final AtomicBoolean killInvoked = new AtomicBoolean(false);
- TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
+ @VisibleForTesting
+ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
Configuration conf,
ExecutionContext executionContext, Map<String, String> envMap,
Credentials credentials,
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
new file mode 100644
index 0000000..447fc7b
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+
+// if map tasks and reduce tasks are in finishable state then priority is given to the task in
+// the following order
+// 1) Dag start time
+// 2) Within dag priority
+// 3) Attempt start time
+// 4) Vertex parallelism
+public class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
+
+ @Override
+ public int compare(TaskWrapper t1, TaskWrapper t2) {
+ TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+ TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+ boolean o1CanFinish = o1.canFinish();
+ boolean o2CanFinish = o2.canFinish();
+ if (o1CanFinish == true && o2CanFinish == false) {
+ return -1;
+ } else if (o1CanFinish == false && o2CanFinish == true) {
+ return 1;
+ }
+
+ LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+ LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+ if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
+ return -1;
+ } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
+ return 1;
+ }
+
+ // Check if these belong to the same task, and work with withinDagPriority
+ if (o1.getQueryId().equals(o2.getQueryId())) {
+ // Same Query
+ // Within dag priority - lower values indicate higher priority.
+ if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+ return -1;
+ } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+ return 1;
+ }
+ }
+
+ if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
+ return -1;
+ } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
+ return 1;
+ }
+
+ // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+ // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+ int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+ int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+ if (knownPending1 < knownPending2) {
+ return -1;
+ } else if (knownPending1 > knownPending2) {
+ return 1;
+ }
+
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
new file mode 100644
index 0000000..238ae9e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+
+// if map tasks and reduce tasks are in finishable state then priority is given to the task
+// that has less number of pending tasks (shortest job)
+public class ShortestJobFirstComparator implements Comparator<TaskWrapper> {
+
+ @Override
+ public int compare(TaskWrapper t1, TaskWrapper t2) {
+ TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+ TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
+ boolean o1CanFinish = o1.canFinish();
+ boolean o2CanFinish = o2.canFinish();
+ if (o1CanFinish == true && o2CanFinish == false) {
+ return -1;
+ } else if (o1CanFinish == false && o2CanFinish == true) {
+ return 1;
+ }
+
+ LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+ LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+ // Check if these belong to the same task, and work with withinDagPriority
+ if (o1.getQueryId().equals(o2.getQueryId())) {
+ // Same Query
+ // Within dag priority - lower values indicate higher priority.
+ if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+ return -1;
+ } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+ return 1;
+ }
+ }
+
+ // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+ // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+ int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+ int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+ if (knownPending1 < knownPending2) {
+ return -1;
+ } else if (knownPending1 > knownPending2) {
+ return 1;
+ }
+
+ if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
+ return -1;
+ } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
+ return 1;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
new file mode 100644
index 0000000..ec1ffcf
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskExecutorTestHelpers {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
+
+ public static MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime,
+ boolean canFinish, long workTime) {
+ SubmitWorkRequestProto
+ requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
+ startTime);
+ MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
+ return mockRequest;
+ }
+
+ public static TaskExecutorService.TaskWrapper createTaskWrapper(
+ SubmitWorkRequestProto request, boolean canFinish, int workTime) {
+ MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
+ TaskExecutorService.TaskWrapper
+ taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null);
+ return taskWrapper;
+ }
+
+
+ public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+ int fragmentNumber, int selfAndUpstreamParallelism,
+ long attemptStartTime) {
+ return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0,
+ attemptStartTime, 1);
+ }
+
+ public static SubmitWorkRequestProto createSubmitWorkRequestProto(
+ int fragmentNumber, int selfAndUpstreamParallelism,
+ int selfAndUpstreamComplete,
+ long attemptStartTime, int withinDagPriority) {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+ return SubmitWorkRequestProto
+ .newBuilder()
+ .setFragmentSpec(
+ LlapDaemonProtocolProtos.FragmentSpecProto
+ .newBuilder()
+ .setAttemptNumber(0)
+ .setDagName("MockDag")
+ .setFragmentNumber(fragmentNumber)
+ .setVertexName("MockVertex")
+ .setProcessorDescriptor(
+ LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder()
+ .setClassName("MockProcessor").build())
+ .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+ .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+ .setContainerIdString("MockContainer_1").setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
+ .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+ .FragmentRuntimeInfo
+ .newBuilder()
+ .setFirstAttemptStartTime(attemptStartTime)
+ .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
+ .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
+ .setWithinDagPriority(withinDagPriority)
+ .build())
+ .build();
+ }
+
+ public static class MockRequest extends TaskRunnerCallable {
+ private final long workTime;
+ private final boolean canFinish;
+
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
+ private final AtomicBoolean wasKilled = new AtomicBoolean(false);
+ private final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition startedCondition = lock.newCondition();
+ private final Condition sleepCondition = lock.newCondition();
+ private final Condition finishedCondition = lock.newCondition();
+
+ public MockRequest(SubmitWorkRequestProto requestProto,
+ boolean canFinish, long workTime) {
+ super(requestProto, mock(QueryFragmentInfo.class), new Configuration(),
+ new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock(
+ LlapDaemonExecutorMetrics.class),
+ mock(KilledTaskHandler.class), mock(
+ FragmentCompletionHandler.class));
+ this.workTime = workTime;
+ this.canFinish = canFinish;
+ }
+
+ @Override
+ protected TaskRunner2Result callInternal() {
+ try {
+ logInfo(super.getRequestId() + " is executing..", null);
+ lock.lock();
+ try {
+ isStarted.set(true);
+ startedCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+
+ lock.lock();
+ try {
+ sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ wasInterrupted.set(true);
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+ } finally {
+ lock.unlock();
+ }
+ if (wasKilled.get()) {
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+ } else {
+ return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+ }
+ } finally {
+ lock.lock();
+ try {
+ isFinished.set(true);
+ finishedCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void killTask() {
+ lock.lock();
+ try {
+ wasKilled.set(true);
+ sleepCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ boolean hasStarted() {
+ return isStarted.get();
+ }
+
+ boolean hasFinished() {
+ return isFinished.get();
+ }
+
+ boolean wasPreempted() {
+ return wasKilled.get();
+ }
+
+ void complete() {
+ lock.lock();
+ try {
+ sleepCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void awaitStart() throws InterruptedException {
+ lock.lock();
+ try {
+ while (!isStarted.get()) {
+ startedCondition.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void awaitEnd() throws InterruptedException {
+ lock.lock();
+ try {
+ while (!isFinished.get()) {
+ finishedCondition.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ @Override
+ public boolean canFinish() {
+ return canFinish;
+ }
+ }
+
+ private static void logInfo(String message, Throwable t) {
+ LOG.info(message, t);
+ }
+
+ private static void logInfo(String message) {
+ logInfo(message, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 7a01b39..34ab40a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -17,227 +17,31 @@
*/
package org.apache.hadoop.hive.llap.daemon.impl;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createMockRequest;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto;
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
-import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.task.EndReason;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest;
+import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator;
import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.junit.Before;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class TestTaskExecutorService {
- private static Configuration conf;
- private static Credentials cred = new Credentials();
- private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
-
- @Before
- public void setup() {
- conf = new Configuration();
- }
-
-
- @Test(timeout = 5000)
- public void testWaitQueueComparator() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
- TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
- TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000);
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // this offer will be rejected
- assertEquals(r5, queue.offer(r5));
- assertEquals(r1, queue.take());
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r4, queue.take());
-
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // this offer will be rejected
- assertEquals(r5, queue.offer(r5));
- assertEquals(r1, queue.take());
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r4, queue.take());
-
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
- assertEquals(r1, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r5, queue.take());
- assertEquals(r2, queue.take());
-
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
- assertEquals(r1, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r5, queue.take());
- assertEquals(r2, queue.take());
-
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
- assertEquals(r1, queue.take());
- assertEquals(r5, queue.take());
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
-
- r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
- r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000);
- r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000);
- r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000);
- r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r2, queue.peek());
- // offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r4, queue.take());
- assertEquals(r5, queue.take());
- }
-
- @Test(timeout = 5000)
- public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
-
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
-
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- }
-
- @Test(timeout = 5000)
- public void testWaitQueueComparatorParallelism() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
- TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
- TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
-
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
-
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- }
@Test(timeout = 5000)
public void testPreemptionQueueComparator() throws InterruptedException {
@@ -265,8 +69,9 @@ public class TestTaskExecutorService {
public void testFinishablePreeptsNonFinishable() throws InterruptedException {
MockRequest r1 = createMockRequest(1, 1, 100, false, 5000l);
MockRequest r2 = createMockRequest(2, 1, 100, true, 1000l);
- TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, false, true);
- taskExecutorService.init(conf);
+ TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2,
+ ShortestJobFirstComparator.class.getName(), true);
+ taskExecutorService.init(new Configuration());
taskExecutorService.start();
try {
@@ -306,8 +111,8 @@ public class TestTaskExecutorService {
MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l);
TaskExecutorServiceForTest taskExecutorService =
- new TaskExecutorServiceForTest(1, 2, false, true);
- taskExecutorService.init(conf);
+ new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
+ taskExecutorService.init(new Configuration());
taskExecutorService.start();
try {
@@ -379,197 +184,12 @@ public class TestTaskExecutorService {
}
- // ----------- Helper classes and methods go after this point. Tests above this -----------
-
- // Create requests with the same within dag priority
- private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
- long attemptStartTime) {
- return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1);
- }
-
- private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
- int selfAndUpstreamComplete,
- long attemptStartTime, int withinDagPriority) {
- ApplicationId appId = ApplicationId.newInstance(9999, 72);
- TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- TezVertexID vId = TezVertexID.getInstance(dagId, 35);
- TezTaskID tId = TezTaskID.getInstance(vId, 389);
- TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
- return SubmitWorkRequestProto
- .newBuilder()
- .setFragmentSpec(
- FragmentSpecProto
- .newBuilder()
- .setAttemptNumber(0)
- .setDagName("MockDag")
- .setFragmentNumber(fragmentNumber)
- .setVertexName("MockVertex")
- .setProcessorDescriptor(
- EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
- .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
- .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
- .setContainerIdString("MockContainer_1").setUser("MockUser")
- .setTokenIdentifier("MockToken_1")
- .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
- .FragmentRuntimeInfo
- .newBuilder()
- .setFirstAttemptStartTime(attemptStartTime)
- .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
- .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
- .setWithinDagPriority(withinDagPriority)
- .build())
- .build();
- }
-
- private MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime,
- boolean canFinish, long workTime) {
- SubmitWorkRequestProto requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism,
- startTime);
- MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime);
- return mockRequest;
- }
-
- private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
- MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
- TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
- return taskWrapper;
- }
-
- private static void logInfo(String message, Throwable t) {
- LOG.info(message, t);
- }
-
- private static void logInfo(String message) {
- logInfo(message, null);
- }
-
- private static class MockRequest extends TaskRunnerCallable {
- private final long workTime;
- private final boolean canFinish;
-
- private final AtomicBoolean isStarted = new AtomicBoolean(false);
- private final AtomicBoolean isFinished = new AtomicBoolean(false);
- private final AtomicBoolean wasKilled = new AtomicBoolean(false);
- private final AtomicBoolean wasInterrupted = new AtomicBoolean(false);
-
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition startedCondition = lock.newCondition();
- private final Condition sleepCondition = lock.newCondition();
- private final Condition finishedCondition = lock.newCondition();
-
- public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto,
- boolean canFinish, long workTime) {
- super(requestProto, mock(QueryFragmentInfo.class), conf,
- new ExecutionContextImpl("localhost"), null, cred, 0, null, null, mock(
- LlapDaemonExecutorMetrics.class),
- mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class));
- this.workTime = workTime;
- this.canFinish = canFinish;
- }
- @Override
- protected TaskRunner2Result callInternal() {
- try {
- logInfo(super.getRequestId() + " is executing..", null);
- lock.lock();
- try {
- isStarted.set(true);
- startedCondition.signal();
- } finally {
- lock.unlock();
- }
-
- lock.lock();
- try {
- sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- wasInterrupted.set(true);
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
- } finally {
- lock.unlock();
- }
- if (wasKilled.get()) {
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
- } else {
- return new TaskRunner2Result(EndReason.SUCCESS, null, false);
- }
- } finally {
- lock.lock();
- try {
- isFinished.set(true);
- finishedCondition.signal();
- } finally {
- lock.unlock();
- }
- }
- }
-
- @Override
- public void killTask() {
- lock.lock();
- try {
- wasKilled.set(true);
- sleepCondition.signal();
- } finally {
- lock.unlock();
- }
- }
-
- boolean hasStarted() {
- return isStarted.get();
- }
-
- boolean hasFinished() {
- return isFinished.get();
- }
-
- boolean wasPreempted() {
- return wasKilled.get();
- }
-
- void complete() {
- lock.lock();
- try {
- sleepCondition.signal();
- } finally {
- lock.unlock();
- }
- }
-
- void awaitStart() throws InterruptedException {
- lock.lock();
- try {
- while (!isStarted.get()) {
- startedCondition.await();
- }
- } finally {
- lock.unlock();
- }
- }
-
- void awaitEnd() throws InterruptedException {
- lock.lock();
- try {
- while (!isFinished.get()) {
- finishedCondition.await();
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- @Override
- public boolean canFinish() {
- return canFinish;
- }
- }
private static class TaskExecutorServiceForTest extends TaskExecutorService {
- public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, boolean useFairOrdering,
+ public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
boolean enablePreemption) {
- super(numExecutors, waitQueueSize, useFairOrdering, enablePreemption);
+ super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption);
}
private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
deleted file mode 100644
index 1929439..0000000
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.daemon.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
-import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
-import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
-import org.apache.tez.runtime.task.EndReason;
-import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestTaskExecutorService2 {
- private static Configuration conf;
- private static Credentials cred = new Credentials();
-
- private static class MockRequest extends TaskRunnerCallable {
- private int workTime;
- private boolean canFinish;
-
- public MockRequest(SubmitWorkRequestProto requestProto,
- boolean canFinish, int workTime) {
- super(requestProto, mock(QueryFragmentInfo.class), conf,
- new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
- mock(KilledTaskHandler.class), mock(
- FragmentCompletionHandler.class));
- this.workTime = workTime;
- this.canFinish = canFinish;
- }
-
- @Override
- protected TaskRunner2Result callInternal() {
- System.out.println(super.getRequestId() + " is executing..");
- try {
- Thread.sleep(workTime);
- } catch (InterruptedException e) {
- return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
- }
- return new TaskRunner2Result(EndReason.SUCCESS, null, false);
- }
-
- @Override
- public boolean canFinish() {
- return canFinish;
- }
- }
-
- @Before
- public void setup() {
- conf = new Configuration();
- }
-
- private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
- int attemptStartTime) {
- // Same priority for all tasks.
- return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
- }
-
- private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
- int numSelfAndUpstreamComplete, int dagStartTime,
- int attemptStartTime, int withinDagPriority) {
- ApplicationId appId = ApplicationId.newInstance(9999, 72);
- TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- TezVertexID vId = TezVertexID.getInstance(dagId, 35);
- TezTaskID tId = TezTaskID.getInstance(vId, 389);
- TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
- return SubmitWorkRequestProto
- .newBuilder()
- .setFragmentSpec(
- FragmentSpecProto
- .newBuilder()
- .setAttemptNumber(0)
- .setDagName("MockDag")
- .setFragmentNumber(fragmentNumber)
- .setVertexName("MockVertex")
- .setProcessorDescriptor(
- EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
- .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
- .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
- .setContainerIdString("MockContainer_1").setUser("MockUser")
- .setTokenIdentifier("MockToken_1")
- .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
- .FragmentRuntimeInfo
- .newBuilder()
- .setDagStartTime(dagStartTime)
- .setFirstAttemptStartTime(attemptStartTime)
- .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
- .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
- .setWithinDagPriority(withinDagPriority)
- .build())
- .build();
- }
-
- @Test
- public void testWaitQueueComparator() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
- TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
- TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r4, queue.peek());
- // this offer will be accepted and r1 evicted
- assertEquals(r1, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r4, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r2, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
- r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
- r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
- r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r4, queue.peek());
- // this offer will be accpeted and r1 evicted
- assertEquals(r1, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r4, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r2, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
- r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
- r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
- r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r3, queue.peek());
- // offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- assertEquals(r4, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
- r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
- r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
- r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r3, queue.peek());
- // offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- assertEquals(r4, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
- r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
- r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
- r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r1, queue.peek());
- // offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r1, queue.take());
- assertEquals(r4, queue.take());
- assertEquals(r3, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
- r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
- r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
- r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r4, queue.peek());
- // offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
- assertEquals(r5, queue.take());
- assertEquals(r4, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r2, queue.take());
-
- r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
- r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
- r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
- r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
- r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
- queue = new EvictingPriorityBlockingQueue(
- new TaskExecutorService.FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
- assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
- assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
- assertEquals(r4, queue.peek());
- // offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
- assertEquals(r4, queue.take());
- assertEquals(r5, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r2, queue.take());
- }
-
- @Test(timeout = 5000)
- public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
-
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
-
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- }
-
- @Test(timeout = 5000)
- public void testWaitQueueComparatorParallelism() throws InterruptedException {
- TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
- TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
- TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
-
- EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
- new TaskExecutorService.ShortestJobFirstComparator(), 4);
-
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
-
- assertEquals(r2, queue.take());
- assertEquals(r3, queue.take());
- assertEquals(r1, queue.take());
- }
-
-
- private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
- MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
- TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);
- return taskWrapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d28b6a53/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
new file mode 100644
index 0000000..ebfb430
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl.comparator;
+
+import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
+import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
+import org.apache.tez.runtime.task.EndReason;
+import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFirstInFirstOutComparator {
+ private static Configuration conf;
+ private static Credentials cred = new Credentials();
+
+ private static class MockRequest extends TaskRunnerCallable {
+ private int workTime;
+ private boolean canFinish;
+
+ public MockRequest(SubmitWorkRequestProto requestProto,
+ boolean canFinish, int workTime) {
+ super(requestProto, mock(QueryFragmentInfo.class), conf,
+ new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null,
+ mock(KilledTaskHandler.class), mock(
+ FragmentCompletionHandler.class));
+ this.workTime = workTime;
+ this.canFinish = canFinish;
+ }
+
+ @Override
+ protected TaskRunner2Result callInternal() {
+ System.out.println(super.getRequestId() + " is executing..");
+ try {
+ Thread.sleep(workTime);
+ } catch (InterruptedException e) {
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+ }
+ return new TaskRunner2Result(EndReason.SUCCESS, null, false);
+ }
+
+ @Override
+ public boolean canFinish() {
+ return canFinish;
+ }
+ }
+
+ @Before
+ public void setup() {
+ conf = new Configuration();
+ }
+
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
+ int attemptStartTime) {
+ // Same priority for all tasks.
+ return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
+ }
+
+ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+ int numSelfAndUpstreamComplete, int dagStartTime,
+ int attemptStartTime, int withinDagPriority) {
+ ApplicationId appId = ApplicationId.newInstance(9999, 72);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vId = TezVertexID.getInstance(dagId, 35);
+ TezTaskID tId = TezTaskID.getInstance(vId, 389);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber);
+ return SubmitWorkRequestProto
+ .newBuilder()
+ .setFragmentSpec(
+ FragmentSpecProto
+ .newBuilder()
+ .setAttemptNumber(0)
+ .setDagName("MockDag")
+ .setFragmentNumber(fragmentNumber)
+ .setVertexName("MockVertex")
+ .setProcessorDescriptor(
+ EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
+ .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
+ .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1")
+ .setContainerIdString("MockContainer_1").setUser("MockUser")
+ .setTokenIdentifier("MockToken_1")
+ .setFragmentRuntimeInfo(LlapDaemonProtocolProtos
+ .FragmentRuntimeInfo
+ .newBuilder()
+ .setDagStartTime(dagStartTime)
+ .setFirstAttemptStartTime(attemptStartTime)
+ .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
+ .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
+ .setWithinDagPriority(withinDagPriority)
+ .build())
+ .build();
+ }
+
+ @Test
+ public void testWaitQueueComparator() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+ TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // this offer will be accepted and r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // this offer will be accpeted and r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r3, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r3, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r1, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // offer accepted, r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r5, queue.take());
+ assertEquals(r4, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+
+ r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000);
+ r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000);
+ r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000);
+ r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000);
+ r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
+ queue = new EvictingPriorityBlockingQueue(
+ new FirstInFirstOutComparator(), 4);
+ assertNull(queue.offer(r1));
+ assertEquals(r1, queue.peek());
+ assertNull(queue.offer(r2));
+ assertEquals(r2, queue.peek());
+ assertNull(queue.offer(r3));
+ assertEquals(r3, queue.peek());
+ assertNull(queue.offer(r4));
+ assertEquals(r4, queue.peek());
+ // offer accepted, r1 evicted
+ assertEquals(r1, queue.offer(r5));
+ assertEquals(r4, queue.take());
+ assertEquals(r5, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r2, queue.take());
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new FirstInFirstOutComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+
+ @Test(timeout = 5000)
+ public void testWaitQueueComparatorParallelism() throws InterruptedException {
+ TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
+ TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
+ TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+
+ EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+ new FirstInFirstOutComparator(), 4);
+
+ assertNull(queue.offer(r1));
+ assertNull(queue.offer(r2));
+ assertNull(queue.offer(r3));
+
+ assertEquals(r2, queue.take());
+ assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ }
+}