You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2015/08/14 03:03:03 UTC

hive git commit: HIVE-11272. LLAP: Execution order within LLAP daemons should consider query-specific priority assigned to fragments. (Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/llap 3bf0a45f8 -> a8ac648c8


HIVE-11272. LLAP: Execution order within LLAP daemons should consider query-specific priority assigned to fragments. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8ac648c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8ac648c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8ac648c

Branch: refs/heads/llap
Commit: a8ac648c86f11f856b1a307aa0e545482618b769
Parents: 3bf0a45
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Aug 13 18:01:52 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 13 18:01:52 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   | 94 +++++++++++++++-----
 .../llap/daemon/impl/TaskRunnerCallable.java    | 36 ++++----
 .../daemon/impl/TestTaskExecutorService.java    | 50 ++++++++++-
 .../daemon/impl/TestTaskExecutorService2.java   | 52 ++++++++++-
 4 files changed, 187 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 5099a5c..f99c05d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.runtime.task.EndReason;
@@ -380,6 +381,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     try {
       synchronized (lock) {
         boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
+        LOG.info("Attempting to execute {}", taskWrapper);
         ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable());
         taskWrapper.setIsInWaitQueue(false);
         FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(taskWrapper);
@@ -584,23 +586,41 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     public int compare(TaskWrapper t1, TaskWrapper t2) {
       TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
       TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
-      boolean newCanFinish = o1.canFinish();
-      boolean oldCanFinish = o2.canFinish();
-      if (newCanFinish == true && oldCanFinish == false) {
+      boolean o1CanFinish = o1.canFinish();
+      boolean o2CanFinish = o2.canFinish();
+      if (o1CanFinish == true && o2CanFinish == false) {
         return -1;
-      } else if (newCanFinish == false && oldCanFinish == true) {
+      } else if (o1CanFinish == false && o2CanFinish == true) {
         return 1;
       }
 
-      if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+      FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+      FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+      // Check if these belong to the same task, and work with withinDagPriority
+      if (o1.getQueryId().equals(o2.getQueryId())) {
+        // Same Query
+        // Within dag priority - lower values indicate higher priority.
+        if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+          return -1;
+        } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+          return 1;
+        }
+      }
+
+      // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+      // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+      int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+      int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+      if (knownPending1 < knownPending2) {
         return -1;
-      } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+      } else if (knownPending1 > knownPending2) {
         return 1;
       }
 
-      if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+      if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
         return -1;
-      } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+      } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
         return 1;
       }
       return 0;
@@ -610,8 +630,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
   // if map tasks and reduce tasks are in finishable state then priority is given to the task in
   // the following order
   // 1) Dag start time
-  // 2) Attempt start time
-  // 3) Vertex parallelism
+  // 2) Within dag priority
+  // 3) Attempt start time
+  // 4) Vertex parallelism
   @VisibleForTesting
   public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> {
 
@@ -619,29 +640,47 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     public int compare(TaskWrapper t1, TaskWrapper t2) {
       TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
       TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
-      boolean newCanFinish = o1.canFinish();
-      boolean oldCanFinish = o2.canFinish();
-      if (newCanFinish == true && oldCanFinish == false) {
+      boolean o1CanFinish = o1.canFinish();
+      boolean o2CanFinish = o2.canFinish();
+      if (o1CanFinish == true && o2CanFinish == false) {
         return -1;
-      } else if (newCanFinish == false && oldCanFinish == true) {
+      } else if (o1CanFinish == false && o2CanFinish == true) {
         return 1;
       }
 
-      if (o1.getDagStartTime() < o2.getDagStartTime()) {
+      FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+      FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+      if (fri1.getDagStartTime() < fri2.getDagStartTime()) {
         return -1;
-      } else if (o1.getDagStartTime() > o2.getDagStartTime()) {
+      } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) {
         return 1;
       }
 
-      if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) {
+      // Check if these belong to the same task, and work with withinDagPriority
+      if (o1.getQueryId().equals(o2.getQueryId())) {
+        // Same Query
+        // Within dag priority - lower values indicate higher priority.
+        if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) {
+          return -1;
+        } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){
+          return 1;
+        }
+      }
+
+      if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) {
         return -1;
-      } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) {
+      } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) {
         return 1;
       }
 
-      if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+      // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and
+      // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed.
+      int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks();
+      int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks();
+      if (knownPending1 < knownPending2) {
         return -1;
-      } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+      } else if (knownPending1 > knownPending2) {
         return 1;
       }
 
@@ -656,9 +695,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
     public int compare(TaskWrapper t1, TaskWrapper t2) {
       TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
       TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
-      if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
+      FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
+      FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
+
+      if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) {
         return 1;
-      } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
+      } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) {
         return -1;
       }
       return 0;
@@ -738,8 +780,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
           ", inPreemptionQueue=" + inPreemptionQueue +
           ", registeredForNotifications=" + registeredForNotifications +
           ", canFinish=" + taskRunnerCallable.canFinish() +
-          ", firstAttemptStartTime=" + taskRunnerCallable.getFirstAttemptStartTime() +
-          ", vertexParallelism=" + taskRunnerCallable.getVertexParallelism() +
+          ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() +
+          ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() +
+          ", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() +
+          ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() +
+          ", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
+          ", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
           '}';
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 52f21d9..6ceb2e5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -96,6 +97,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private volatile String threadName;
   private final LlapDaemonExecutorMetrics metrics;
   private final String requestId;
+  private final String queryId;
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
@@ -129,7 +131,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         request.getUser(), jobToken, null, request.getFragmentSpec().getDagName());
     }
     this.metrics = metrics;
-    this.requestId = getRequestId(request);
+    this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
+    // TODO Change this to the queryId/Name when that's available.
+    this.queryId = request.getFragmentSpec().getDagName();
     this.killedTaskHandler = killedTaskHandler;
     this.fragmentCompletionHanler = fragmentCompleteHandler;
   }
@@ -330,8 +334,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   @Override
   public String toString() {
     return requestId + " {canFinish: " + canFinish() +
-        " vertexParallelism: " + getVertexParallelism() +
-        " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}";
+        ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() +
+        ", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() +
+        ", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() +
+        ", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() +
+        ", dagStartTime:" + getFragmentRuntimeInfo().getDagStartTime() +
+        ", withinDagPriority: " + getFragmentRuntimeInfo().getWithinDagPriority() +
+        "}";
   }
 
   @Override
@@ -347,14 +356,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     return requestId.equals(((TaskRunnerCallable) obj).getRequestId());
   }
 
-  public int getVertexParallelism() {
-    return request.getFragmentSpec().getVertexParallelism();
-  }
-
   public String getRequestId() {
     return requestId;
   }
 
+  public String getQueryId() {
+    return queryId;
+  }
+
   public QueryFragmentInfo getFragmentInfo() {
     return fragmentInfo;
   }
@@ -470,16 +479,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     return sb.toString();
   }
 
-  private static String getRequestId(SubmitWorkRequestProto request) {
-    return request.getFragmentSpec().getFragmentIdentifierString();
+  public FragmentRuntimeInfo getFragmentRuntimeInfo() {
+    return request.getFragmentRuntimeInfo();
   }
 
-  public long getFirstAttemptStartTime() {
-    return request.getFragmentRuntimeInfo().getFirstAttemptStartTime();
+  public FragmentSpecProto getFragmentSpec() {
+    return request.getFragmentSpec();
   }
-
-  public long getDagStartTime() {
-    return request.getFragmentRuntimeInfo().getDagStartTime();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 25f7a81..7a01b39 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -67,6 +67,7 @@ public class TestTaskExecutorService {
     conf = new Configuration();
   }
 
+
   @Test(timeout = 5000)
   public void testWaitQueueComparator() throws InterruptedException {
     TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
@@ -203,6 +204,42 @@ public class TestTaskExecutorService {
   }
 
   @Test(timeout = 5000)
+  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorParallelism() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+  @Test(timeout = 5000)
   public void testPreemptionQueueComparator() throws InterruptedException {
     TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000);
     TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000);
@@ -344,8 +381,15 @@ public class TestTaskExecutorService {
 
   // ----------- Helper classes and methods go after this point. Tests above this -----------
 
-  private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int parallelism,
+  // Create requests with the same within dag priority
+  private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
                                                               long attemptStartTime) {
+    return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1);
+  }
+
+  private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism,
+                                                              int selfAndUpstreamComplete,
+                                                              long attemptStartTime, int withinDagPriority) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -360,7 +404,6 @@ public class TestTaskExecutorService {
                 .setDagName("MockDag")
                 .setFragmentNumber(fragmentNumber)
                 .setVertexName("MockVertex")
-                .setVertexParallelism(parallelism)
                 .setProcessorDescriptor(
                     EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
                 .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
@@ -371,6 +414,9 @@ public class TestTaskExecutorService {
             .FragmentRuntimeInfo
             .newBuilder()
             .setFirstAttemptStartTime(attemptStartTime)
+            .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism)
+            .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete)
+            .setWithinDagPriority(withinDagPriority)
             .build())
         .build();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
index ad2a15b..1929439 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java
@@ -81,8 +81,15 @@ public class TestTaskExecutorService2 {
     conf = new Configuration();
   }
 
-  private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime,
-      int attemptStartTime) {
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime,
+                                               int attemptStartTime) {
+    // Same priority for all tasks.
+    return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1);
+  }
+
+  private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks,
+                                               int numSelfAndUpstreamComplete, int dagStartTime,
+                                               int attemptStartTime, int withinDagPriority) {
     ApplicationId appId = ApplicationId.newInstance(9999, 72);
     TezDAGID dagId = TezDAGID.getInstance(appId, 1);
     TezVertexID vId = TezVertexID.getInstance(dagId, 35);
@@ -97,7 +104,6 @@ public class TestTaskExecutorService2 {
                 .setDagName("MockDag")
                 .setFragmentNumber(fragmentNumber)
                 .setVertexName("MockVertex")
-                .setVertexParallelism(parallelism)
                 .setProcessorDescriptor(
                     EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build())
                 .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost")
@@ -109,6 +115,9 @@ public class TestTaskExecutorService2 {
             .newBuilder()
             .setDagStartTime(dagStartTime)
             .setFirstAttemptStartTime(attemptStartTime)
+            .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks)
+            .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete)
+            .setWithinDagPriority(withinDagPriority)
             .build())
         .build();
   }
@@ -270,6 +279,43 @@ public class TestTaskExecutorService2 {
     assertEquals(r2, queue.take());
   }
 
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+  @Test(timeout = 5000)
+  public void testWaitQueueComparatorParallelism() throws InterruptedException {
+    TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000);
+    TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000);
+    TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000);
+
+    EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
+        new TaskExecutorService.ShortestJobFirstComparator(), 4);
+
+    assertNull(queue.offer(r1));
+    assertNull(queue.offer(r2));
+    assertNull(queue.offer(r3));
+
+    assertEquals(r2, queue.take());
+    assertEquals(r3, queue.take());
+    assertEquals(r1, queue.take());
+  }
+
+
   private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) {
     MockRequest mockRequest = new MockRequest(request, canFinish, workTime);
     TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);