You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 20:59:20 UTC
[52/70] [abbrv] hive git commit: HIVE-11687. TaskExecutorService can
reject work even if capacity is available. (Siddharth Seth,
reviewed by Prasanth Jayachandran)
HIVE-11687. TaskExecutorService can reject work even if capacity is available. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/29e671ed
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/29e671ed
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/29e671ed
Branch: refs/heads/hive-14535
Commit: 29e671ed9608ce1577b3b16b0290d022bf39d950
Parents: 8230b57
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 6 13:23:58 2017 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 6 13:23:58 2017 -0800
----------------------------------------------------------------------
.../SchedulerFragmentCompletingListener.java | 29 +++++
.../llap/daemon/impl/ContainerRunnerImpl.java | 6 +-
.../impl/EvictingPriorityBlockingQueue.java | 69 +++++++---
.../hive/llap/daemon/impl/LlapTaskReporter.java | 38 +++++-
.../llap/daemon/impl/PriorityBlockingDeque.java | 4 +
.../llap/daemon/impl/TaskExecutorService.java | 111 ++++++++++++++--
.../llap/daemon/impl/TaskRunnerCallable.java | 24 +++-
.../llap/metrics/LlapDaemonExecutorInfo.java | 10 +-
.../llap/metrics/LlapDaemonExecutorMetrics.java | 64 +++++++++-
.../daemon/impl/TaskExecutorTestHelpers.java | 17 ++-
.../impl/TestEvictingPriorityBlockingQueue.java | 128 +++++++++++++++++++
.../daemon/impl/TestTaskExecutorService.java | 63 ++++++++-
.../TestFirstInFirstOutComparator.java | 90 ++++++-------
.../TestShortestJobFirstComparator.java | 92 ++++++-------
14 files changed, 608 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
new file mode 100644
index 0000000..9f580f6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/SchedulerFragmentCompletingListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+public interface SchedulerFragmentCompletingListener {
+
+ enum State {
+ SUCCESS, FAILED, KILLED
+ }
+
+ /**
+ * Indicates that a fragment is about to complete.
+ * @param fragmentId
+ */
+ void fragmentCompleting(String fragmentId, State state);
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 8c33fa2..6908138 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.impl.LlapTokenChecker.LlapTokenInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
@@ -93,6 +94,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private final AMReporter amReporter;
private final QueryTracker queryTracker;
private final Scheduler<TaskRunnerCallable> executorService;
+ private final SchedulerFragmentCompletingListener completionListener;
private final AtomicReference<InetSocketAddress> localAddress;
private final AtomicReference<Integer> localShufflePort;
private final Map<String, String> localEnv = new HashMap<>();
@@ -129,6 +131,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
this.executorService = new TaskExecutorService(numExecutors, waitQueueSize,
waitQueueSchedulerClassName, enablePreemption, classLoader, metrics);
+ completionListener = (SchedulerFragmentCompletingListener) executorService;
addIfService(executorService);
@@ -251,7 +254,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
new ExecutionContextImpl(localAddress.get().getHostName()), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
- this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi);
+ this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi,
+ completionListener);
submissionState = executorService.schedule(callable);
if (LOG.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index adc86ea..a80bb9b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -36,27 +36,28 @@ public class EvictingPriorityBlockingQueue<E> {
private final PriorityBlockingDeque<E> deque;
private final Comparator<E> comparator;
+ private final int waitQueueSize;
+
+ private int currentSize = 0;
public EvictingPriorityBlockingQueue(Comparator<E> comparator, int maxSize) {
- this.deque = new PriorityBlockingDeque<>(comparator, maxSize);
+ this.deque = new PriorityBlockingDeque<>(comparator);
+ this.waitQueueSize = maxSize;
this.comparator = comparator;
}
- public synchronized E offer(E e) {
- if (deque.offer(e)) {
+ public synchronized E offer(E e, int additionalElementsAllowed) {
+ if (currentSize < waitQueueSize + additionalElementsAllowed) {
+ // Capacity exists.
+ offerToDequeueInternal(e);
+ currentSize++;
return null;
} else {
+ // No capacity. Check if an element needs to be evicted.
E last = deque.peekLast();
if (comparator.compare(e, last) < 0) {
deque.removeLast();
- if (!deque.offer(e)) {
- LOG.error(
- "Failed to insert element into queue with capacity available. size={}, element={}",
- size(), e);
- throw new RuntimeException(
- "Failed to insert element into queue with capacity available. size=" +
- size());
- }
+ offerToDequeueInternal(e);
return last;
}
return e;
@@ -64,7 +65,7 @@ public class EvictingPriorityBlockingQueue<E> {
}
public synchronized boolean isEmpty() {
- return deque.isEmpty();
+ return currentSize == 0;
}
public synchronized E peek() {
@@ -72,19 +73,55 @@ public class EvictingPriorityBlockingQueue<E> {
}
public synchronized E take() throws InterruptedException {
- return deque.take();
+ E e = deque.take();
+ currentSize--; // Decrement only if an element was removed.
+ return e;
}
public synchronized boolean remove(E e) {
- return deque.remove(e);
+ boolean removed = deque.remove(e);
+ if (removed) {
+ currentSize--;
+ }
+ return removed;
+ }
+
+ /**
+ * Re-insert an element if it exists (mainly to force a re-order)
+ * @param e
+ * @return false if the element was not found. true otherwise.
+ */
+ public synchronized boolean reinsertIfExists(E e) {
+ if (remove(e)) {
+ offerToDequeueInternal(e);
+ currentSize++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void offerToDequeueInternal(E e) {
+ boolean result = deque.offer(e);
+ if (!result) {
+ LOG.error(
+ "Failed to insert element into queue with capacity available. size={}, element={}",
+ size(), e);
+ throw new RuntimeException(
+ "Failed to insert element into queue with capacity available. size=" +
+ size());
+ }
}
public synchronized int size() {
- return deque.size();
+ return currentSize;
}
@Override
public synchronized String toString() {
- return deque.toString();
+ StringBuilder sb = new StringBuilder();
+ sb.append("currentSize=").append(size()).append(", queue=")
+ .append(deque.toString());
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 39b4b0e..2fe1017 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezException;
@@ -82,15 +83,19 @@ public class LlapTaskReporter implements TaskReporterInterface {
private final String containerIdStr;
private final String fragmentId;
private final TezEvent initialEvent;
+ private final SchedulerFragmentCompletingListener completionListener;
+ // The same id as reported by TaskRunnerCallable.getRequestId
+ private final String fragmentRequestId;
private final ListeningExecutorService heartbeatExecutor;
@VisibleForTesting
HeartbeatCallable currentCallable;
- public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
+ public LlapTaskReporter(SchedulerFragmentCompletingListener completionListener, LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
- String containerIdStr, final String fragmentId, TezEvent initialEvent) {
+ String containerIdStr, final String fragmentId, TezEvent initialEvent,
+ String fragmentRequestId) {
this.umbilical = umbilical;
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
@@ -102,6 +107,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
+ this.completionListener = completionListener;
+ this.fragmentRequestId = fragmentRequestId;
}
/**
@@ -113,8 +120,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
TezCounters tezCounters = task.addAndGetTezCounter(fragmentId);
FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters);
LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName());
- currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
- maxEventsToGet, requestCounter, containerIdStr, initialEvent);
+ currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval,
+ maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId);
ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
Futures.addCallback(future, new HeartbeatCallback(errorReporter));
}
@@ -144,6 +151,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
private final RuntimeTask task;
private final EventMetaData updateEventMetadata;
+ private final SchedulerFragmentCompletingListener completionListener;
+ private final String fragmentRequestId;
private final LlapTaskUmbilicalProtocol umbilical;
@@ -174,9 +183,12 @@ public class LlapTaskReporter implements TaskReporterInterface {
private int prevCounterSendHeartbeatNum = 0;
private TezEvent initialEvent;
- public HeartbeatCallable(RuntimeTask task, LlapTaskUmbilicalProtocol umbilical,
+ public HeartbeatCallable(
+ SchedulerFragmentCompletingListener completionListener,
+ RuntimeTask task, LlapTaskUmbilicalProtocol umbilical,
long amPollInterval, long sendCounterInterval, int maxEventsToGet,
- AtomicLong requestCounter, String containerIdStr, TezEvent initialEvent) {
+ AtomicLong requestCounter, String containerIdStr,
+ TezEvent initialEvent, String fragmentRequestId) {
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
@@ -184,6 +196,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
this.requestCounter = requestCounter;
this.containerIdStr = containerIdStr;
this.initialEvent = initialEvent;
+ this.completionListener = completionListener;
+ this.fragmentRequestId = fragmentRequestId;
this.task = task;
this.umbilical = umbilical;
@@ -367,6 +381,10 @@ public class LlapTaskReporter implements TaskReporterInterface {
TezEvent statusUpdateEvent = new TezEvent(getStatusUpdateEvent(true), updateEventMetadata);
TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
updateEventMetadata);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invoking OOB heartbeat for successful attempt: {}, isTaskDone={}", taskAttemptID, task.isTaskDone());
+ }
+ completionListener.fragmentCompleting(fragmentRequestId, SchedulerFragmentCompletingListener.State.SUCCESS);
return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
} else {
LOG.warn("A final task state event has already been sent. Not sending again");
@@ -431,6 +449,14 @@ public class LlapTaskReporter implements TaskReporterInterface {
// Counter may exceed limitation
LOG.warn("Error when get constructing TaskStatusUpdateEvent. Not sending it out");
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Invoking OOB heartbeat for failed/killed attempt: {}, isTaskDone={}, isKilled={}",
+ taskAttemptID, task.isTaskDone(), isKilled);
+ }
+ completionListener.fragmentCompleting(fragmentRequestId,
+ isKilled ? SchedulerFragmentCompletingListener.State.KILLED :
+ SchedulerFragmentCompletingListener.State.FAILED);
return !heartbeat(tezEvents).shouldDie;
} else {
LOG.warn("A final task state event has already been sent. Not sending again");
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
index e27efa5..3bf51cd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
@@ -81,6 +81,10 @@ public class PriorityBlockingDeque<E>
this(null, capacity);
}
+ public PriorityBlockingDeque(Comparator<E> comparator) {
+ this(comparator, Integer.MAX_VALUE);
+ }
+
public PriorityBlockingDeque(Comparator<E> comparator, int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 58863af..dcad3d8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -42,10 +42,13 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.runtime.task.EndReason;
import org.apache.tez.runtime.task.TaskRunner2Result;
import org.slf4j.Logger;
@@ -78,7 +81,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Task executor service can be shut down which will terminated all running tasks and reject all
* new tasks. Shutting down of the task executor service can be done gracefully or immediately.
*/
-public class TaskExecutorService extends AbstractService implements Scheduler<TaskRunnerCallable> {
+public class TaskExecutorService extends AbstractService
+ implements Scheduler<TaskRunnerCallable>, SchedulerFragmentCompletingListener {
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class);
private static final boolean isInfoEnabled = LOG.isInfoEnabled();
private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -89,7 +93,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// Thread pool for actual execution of work.
private final ListeningExecutorService executorService;
- private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
+ @VisibleForTesting
+ final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
// Thread pool for taking entities off the wait queue.
private final ListeningExecutorService waitQueueExecutorService;
// Thread pool for callbacks on completion of execution of a work unit.
@@ -100,6 +105,13 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
private final boolean enablePreemption;
private final ThreadPoolExecutor threadPoolExecutor;
private final AtomicInteger numSlotsAvailable;
+ private final int maxParallelExecutors;
+ private final Clock clock = new MonotonicClock();
+
+ // Tracks running fragments, and completing fragments.
+ // Completing since we have a race in the AM being notified and the task actually
+ // falling off, and the executor service being ready to schedule a new task.
+ private final AtomicInteger runningFragmentCount = new AtomicInteger(0);
@VisibleForTesting
@@ -121,6 +133,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
final Comparator<TaskWrapper> waitQueueComparator = createComparator(
waitQueueComparatorClassName);
+ this.maxParallelExecutors = numExecutors;
this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
numExecutors, // max pool size
@@ -344,8 +357,15 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the
// actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
// The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ",
+ waitQueue.size(), numSlotsAvailable.get(),
+ runningFragmentCount.get());
+ }
+
canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
- evictedTask = waitQueue.offer(taskWrapper);
+ evictedTask = waitQueue.offer(taskWrapper, maxParallelExecutors - runningFragmentCount.get());
// Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
// null evicted task means offer accepted
@@ -366,7 +386,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
} else {
if (isInfoEnabled) {
- LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId());
+ LOG.info(
+ "wait queue full, size={}. numSlotsAvailable={}, runningFragmentCount={}. {} not added",
+ waitQueue.size(), numSlotsAvailable.get(), runningFragmentCount.get(), task.getRequestId());
}
evictedTask.getTaskRunnerCallable().killTask();
@@ -473,6 +495,34 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
+ private static final class FragmentCompletion {
+
+ public FragmentCompletion(
+ State state, long completingTime) {
+ this.state = state;
+ this.completingTime = completingTime;
+ }
+
+ State state;
+ long completingTime;
+ }
+
+ @VisibleForTesting
+ final ConcurrentMap<String, FragmentCompletion>
+ completingFragmentMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void fragmentCompleting(String fragmentId, State state) {
+ int count = runningFragmentCount.decrementAndGet();
+ if (count < 0) {
+ LOG.warn(
+ "RunningFragmentCount went negative. Multiple calls for the same completion. Resetting to 0");
+ runningFragmentCount.set(0);
+ }
+ completingFragmentMap
+ .put(fragmentId, new FragmentCompletion(state, clock.getTime()));
+ }
+
@VisibleForTesting
void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
@@ -481,6 +531,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
LOG.info("Attempting to execute {}", taskWrapper);
ListenableFuture<TaskRunner2Result> future = executorService.submit(
taskWrapper.getTaskRunnerCallable());
+ runningFragmentCount.incrementAndGet();
taskWrapper.setIsInWaitQueue(false);
FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(
taskWrapper);
@@ -547,10 +598,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
// Re-order the wait queue
LOG.debug("Re-ordering the wait queue since {} finishable state moved to {}",
taskWrapper.getRequestId(), newFinishableState);
- if (waitQueue.remove(taskWrapper)) {
- // Put element back only if it existed.
- waitQueue.offer(taskWrapper);
- } else {
+ boolean reInserted = waitQueue.reinsertIfExists(taskWrapper);
+ if (!reInserted) {
LOG.warn("Failed to remove {} from waitQueue",
taskWrapper.getTaskRunnerCallable().getRequestId());
}
@@ -653,6 +702,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
@Override
public void onSuccess(TaskRunner2Result result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received successful completion for: {}",
+ taskWrapper.getRequestId());
+ }
+ updateFallOffStats(taskWrapper.getRequestId());
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
@@ -662,6 +716,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
@Override
public void onFailure(Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received failed completion for: {}",
+ taskWrapper.getRequestId());
+ }
+ updateFallOffStats(taskWrapper.getRequestId());
knownTasks.remove(taskWrapper.getRequestId());
taskWrapper.setIsInPreemptableQueue(false);
taskWrapper.maybeUnregisterForFinishedStateNotifications();
@@ -698,6 +757,42 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
}
}
+ private void updateFallOffStats(
+ String requestId) {
+ long now = clock.getTime();
+ FragmentCompletion fragmentCompletion =
+ completingFragmentMap.remove(requestId);
+ if (fragmentCompletion == null) {
+ LOG.warn(
+ "Received onSuccess/onFailure for a fragment for which a completing message was not received: {}",
+ requestId);
+ // Happens due to AM side pre-emption, or the AM asking for a task to die.
+ // There's no hooks at the moment to get information over.
+ // For now - decrement the count to avoid accounting errors.
+ runningFragmentCount.decrementAndGet();
+ // TODO: Extend TaskRunner2 or see if an API with callbacks will work
+ } else {
+ long timeTaken = now - fragmentCompletion.completingTime;
+ switch (fragmentCompletion.state) {
+ case SUCCESS:
+ if (metrics != null) {
+ metrics.addMetricsFallOffSuccessTimeLost(timeTaken);
+ }
+ break;
+ case FAILED:
+ if (metrics != null) {
+ metrics.addMetricsFallOffFailedTimeLost(timeTaken);
+ }
+ break;
+ case KILLED:
+ if (metrics != null) {
+ metrics.addMetricsFallOffKilledTimeLost(timeTaken);
+ }
+ break;
+ }
+ }
+ }
+
}
public void shutDown(boolean awaitTermination) {
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index bfb155a..4b677aa 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -114,16 +115,17 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final AtomicBoolean killInvoked = new AtomicBoolean(false);
private final SignableVertexSpec vertex;
private final TezEvent initialEvent;
+ private final SchedulerFragmentCompletingListener completionListener;
private UserGroupInformation taskUgi;
@VisibleForTesting
public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
- Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
- Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
- LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
- FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
- TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
- UserGroupInformation taskUgi) {
+ Configuration conf, ExecutionContext executionContext, Map<String, String> envMap,
+ Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams,
+ LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
+ FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
+ TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
+ UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) {
this.request = request;
this.fragmentInfo = fragmentInfo;
this.conf = conf;
@@ -152,6 +154,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.tezHadoopShim = tezHadoopShim;
this.initialEvent = initialEvent;
this.taskUgi = taskUgi;
+ this.completionListener = completionListener;
}
public long getStartTime() {
@@ -219,6 +222,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
taskReporter = new LlapTaskReporter(
+ completionListener,
umbilical,
confParams.amHeartbeatIntervalMsMax,
confParams.amCounterHeartbeatInterval,
@@ -226,7 +230,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
new AtomicLong(0),
request.getContainerIdString(),
fragmentId,
- initialEvent);
+ initialEvent,
+ requestId);
String attemptId = fragmentInfo.getFragmentIdentifierString();
IOContextMap.setThreadAttemptId(attemptId);
@@ -297,9 +302,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
killtimerWatch.start();
LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
boolean killed = taskRunner.killTask();
+
if (killed) {
// Sending a kill message to the AM right here. Don't need to wait for the task to complete.
LOG.info("Kill request for task {} completed. Informing AM", ta);
+ // Inform the scheduler that this fragment has been killed.
+ // If the kill failed - that means the task has already hit a final condition,
+ // and a notification comes from the LlapTaskReporter
+ completionListener.fragmentCompleting(getRequestId(), SchedulerFragmentCompletingListener.State.KILLED);
reportTaskKilled();
} else {
LOG.info("Kill request for task {} did not complete because the task is already complete",
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
index db5fd4f..69d1c6f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
@@ -51,7 +51,15 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo {
ExecutorPercentileTimeLost("Percentile cluster time wasted due to pre-emption"),
ExecutorMaxPreemptionTimeToKill("Max time for killing pre-empted task"),
ExecutorMaxPreemptionTimeLost("Max cluster time lost due to pre-emption"),
- ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority");
+ ExecutorTotalEvictedFromWaitQueue("Total number of tasks evicted from wait queue because of low priority"),
+ ExecutorFallOffSuccessTimeLost("Total time lost in an executor completing after informing the AM - successful fragments"),
+ ExecutorFallOffSuccessMaxTimeLost("Max value of time lost in an executor completing after informing the AM - successful fragments"),
+ ExecutorFallOffFailedTimeLost("Total time lost in an executor completing after informing the AM - failed fragments"),
+ ExecutorFallOffFailedMaxTimeLost("Max value of time lost in an executor completing after informing the AM - failed fragments"),
+ ExecutorFallOffKilledTimeLost("Total time lost in an executor completing after informing the AM - killed fragments"),
+ ExecutorFallOffKilledMaxTimeLost("Max value of time lost in an executor completing after informing the AM - killed fragments"),
+ ExecutorFallOffNumCompletedFragments("Number of completed fragments w.r.t falloff values"),
+ ;
private final String desc;
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
index 92c8913..7a0ecc9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.metrics;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlots;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorAvailableFreeSlotsPercent;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorCacheMemoryPerInstance;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffNumCompletedFragments;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorJvmMaxMemory;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxFreeSlots;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMaxPreemptionTimeLost;
@@ -41,6 +42,12 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.Executo
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeLost;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalPreemptionTimeToKill;
import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorWaitQueueSize;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffSuccessMaxTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffFailedMaxTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledTimeLost;
+import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorFallOffKilledMaxTimeLost;
import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
@@ -83,6 +90,10 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
private long maxTimeLost = Long.MIN_VALUE;
private long maxTimeToKill = Long.MIN_VALUE;
+ private long fallOffMaxSuccessTimeLostLong = 0L;
+ private long fallOffMaxFailedTimeLostLong = 0L;
+ private long fallOffMaxKilledTimeLostLong = 0L;
+
private final Map<String, Integer> executorNames;
final MutableGaugeLong[] executorThreadCpuTime;
@@ -126,6 +137,23 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
@Metric
final MutableQuantiles[] percentileTimeLost;
+ @Metric
+ MutableCounterLong fallOffNumCompletedFragments;
+ @Metric
+ MutableCounterLong fallOffSuccessTimeLost;
+ @Metric
+ MutableCounterLong fallOffFailedTimeLost;
+ @Metric
+ MutableCounterLong fallOffKilledTimeLost;
+ @Metric
+ MutableGaugeLong fallOffMaxSuccessTimeLost;
+ @Metric
+ MutableGaugeLong fallOffMaxFailedTimeLost;
+ @Metric
+ MutableGaugeLong fallOffMaxKilledTimeLost;
+
+
+
private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId,
int numExecutors, final int[] intervals) {
this.name = displayName;
@@ -244,6 +272,33 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
}
}
+ public void addMetricsFallOffSuccessTimeLost(long timeLost) {
+ fallOffNumCompletedFragments.incr();
+ fallOffSuccessTimeLost.incr(timeLost);
+ if (timeLost > fallOffMaxSuccessTimeLostLong) {
+ fallOffMaxSuccessTimeLostLong = timeLost;
+ fallOffMaxSuccessTimeLost.set(timeLost);
+ }
+ }
+
+ public void addMetricsFallOffFailedTimeLost(long timeLost) {
+ fallOffNumCompletedFragments.incr();
+ fallOffFailedTimeLost.incr(timeLost);
+ if (timeLost > fallOffMaxFailedTimeLostLong) {
+ fallOffMaxFailedTimeLostLong = timeLost;
+ fallOffMaxFailedTimeLost.set(timeLost);
+ }
+ }
+
+ public void addMetricsFallOffKilledTimeLost(long timeLost) {
+ fallOffNumCompletedFragments.incr();
+ fallOffKilledTimeLost.incr(timeLost);
+ if (timeLost > fallOffMaxKilledTimeLostLong) {
+ fallOffMaxKilledTimeLostLong = timeLost;
+ fallOffMaxKilledTimeLost.set(timeLost);
+ }
+ }
+
public void incrExecutorTotalKilled() {
executorTotalIKilled.incr();
}
@@ -292,7 +347,14 @@ public class LlapDaemonExecutorMetrics implements MetricsSource {
.addCounter(ExecutorTotalPreemptionTimeToKill, totalPreemptionTimeToKill.value())
.addCounter(ExecutorTotalPreemptionTimeLost, totalPreemptionTimeLost.value())
.addGauge(ExecutorMaxPreemptionTimeToKill, maxPreemptionTimeToKill.value())
- .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value());
+ .addGauge(ExecutorMaxPreemptionTimeLost, maxPreemptionTimeLost.value())
+ .addCounter(ExecutorFallOffSuccessTimeLost, fallOffSuccessTimeLost.value())
+ .addGauge(ExecutorFallOffSuccessMaxTimeLost, fallOffMaxSuccessTimeLost.value())
+ .addCounter(ExecutorFallOffFailedTimeLost, fallOffFailedTimeLost.value())
+ .addGauge(ExecutorFallOffFailedMaxTimeLost, fallOffMaxFailedTimeLost.value())
+ .addCounter(ExecutorFallOffKilledTimeLost, fallOffKilledTimeLost.value())
+ .addGauge(ExecutorFallOffKilledMaxTimeLost, fallOffMaxKilledTimeLost.value())
+ .addCounter(ExecutorFallOffNumCompletedFragments, fallOffNumCompletedFragments.value());
for (MutableQuantiles q : percentileTimeToKill) {
q.snapshot(rb, true);
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 6506d07..5dc1be5 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -54,6 +55,18 @@ public class TaskExecutorTestHelpers {
return createMockRequest(canFinish, workTime, request);
}
+ public static MockRequest createMockRequest(int fragmentNum, int parallelism,
+ int withinDagPriority,
+ long firstAttemptStartTime,
+ long currentAttemptStartTime,
+ boolean canFinish,
+ long workTime) {
+ SubmitWorkRequestProto
+ request = createSubmitWorkRequestProto(fragmentNum, parallelism, 0,
+ firstAttemptStartTime, currentAttemptStartTime, withinDagPriority);
+ return createMockRequest(canFinish, workTime, request);
+ }
+
private static MockRequest createMockRequest(boolean canFinish,
long workTime, SubmitWorkRequestProto request) {
QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(
@@ -170,7 +183,8 @@ public class TaskExecutorTestHelpers {
LlapDaemonExecutorMetrics.class),
mock(KilledTaskHandler.class), mock(
FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
- requestProto.getWorkSpec().getVertex(), initialEvent, null);
+ requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
+ SchedulerFragmentCompletingListener.class));
this.workTime = workTime;
this.canFinish = canFinish;
}
@@ -285,4 +299,5 @@ public class TaskExecutorTestHelpers {
logInfo(message, null);
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
new file mode 100644
index 0000000..62407b5
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestEvictingPriorityBlockingQueue.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Comparator;
+
+import org.junit.Test;
+
+public class TestEvictingPriorityBlockingQueue {
+
+ @Test (timeout = 10000)
+ public void test() throws InterruptedException {
+ Element e;
+ EvictingPriorityBlockingQueue<Element> queue = new EvictingPriorityBlockingQueue<>(new ElementComparator(), 3);
+
+ Element[] elements = new Element[10];
+ for (int i = 0 ; i < elements.length ; i++) {
+ elements[i] = new Element(i);
+ }
+
+ assertNull(queue.offer(elements[0], 0));
+ assertNull(queue.offer(elements[1], 0));
+ assertNull(queue.offer(elements[2], 0));
+ e = queue.offer(elements[3], 0);
+ assertEquals(elements[0], e);
+
+ e = queue.offer(elements[0], 0);
+ assertEquals(elements[0], e);
+ // 1,2,3
+
+ e = queue.offer(elements[4], 0);
+ assertEquals(elements[1], e);
+ //2,3,4
+
+ e = queue.offer(elements[1], 1);
+ assertNull(e);
+ assertEquals(4, queue.size());
+ // 1,2,3,4
+
+ e = queue.take();
+ assertEquals(elements[4], e); //Highest priority at this point should have come out.
+ //1,2,3
+
+ e = queue.offer(elements[4], 1);
+ assertNull(e);
+ //1,2,3,4
+
+ e = queue.offer(elements[0], 1);
+ assertEquals(elements[0], e); // Rejected
+ //1,2,3,4
+
+ assertTrue(queue.reinsertIfExists(elements[2]));
+ assertEquals(4, queue.size());
+
+ assertFalse(queue.reinsertIfExists(elements[5]));
+ assertEquals(4, queue.size());
+
+ //1,2,3,4
+
+ e = queue.offer(elements[5], 1);
+ assertEquals(elements[1], e);
+ //2,3,4,5
+
+ assertNull(queue.offer(elements[1], 2));
+
+ assertNull(queue.offer(elements[6], 5));
+ assertNull(queue.offer(elements[7], 5));
+ //1,2,3,4,5,6,7
+
+ assertEquals(7, queue.size());
+ }
+
+ private static class Element {
+ public Element(int x) {
+ this.x = x;
+ }
+
+ int x;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Element element = (Element) o;
+
+ return x == element.x;
+ }
+
+ @Override
+ public int hashCode() {
+ return x;
+ }
+ }
+
+ private static class ElementComparator implements Comparator<Element> {
+
+ @Override
+ public int compare(Element o1,
+ Element o2) {
+ return o2.x - o1.x;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index de7f2fc..bf7d1d8 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest;
import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator;
@@ -174,12 +175,65 @@ public class TestTaskExecutorService {
}
}
+ // Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot.
+ @Test (timeout = 10000)
+ public void testWaitQueueAcceptAfterAMTaskReport() throws
+ InterruptedException {
+
+ TaskExecutorServiceForTest taskExecutorService =
+ new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true);
+
+ // Fourth is lower priority as a result of canFinish being set to false.
+ MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
+ MockRequest r2 = createMockRequest(2, 1, 1, 200, 2000, true, 20000l);
+ MockRequest r3 = createMockRequest(3, 1, 2, 300, 420, true, 20000l);
+ MockRequest r4 = createMockRequest(4, 1, 3, 400, 510, false, 20000l);
+
+ taskExecutorService.init(new Configuration());
+ taskExecutorService.start();
+ try {
+ Scheduler.SubmissionState submissionState;
+ submissionState = taskExecutorService.schedule(r1);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+ r1.awaitStart();
+
+ submissionState = taskExecutorService.schedule(r2);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+ submissionState = taskExecutorService.schedule(r3);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+ submissionState = taskExecutorService.schedule(r4);
+ assertEquals(Scheduler.SubmissionState.REJECTED, submissionState);
+
+ // Mark a fragment as completing, but don't actually complete it yet.
+ // The wait queue should now have capacity to accept one more fragment.
+ taskExecutorService.fragmentCompleting(r1.getRequestId(),
+ SchedulerFragmentCompletingListener.State.SUCCESS);
+
+ submissionState = taskExecutorService.schedule(r4);
+ assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
+
+ assertEquals(3, taskExecutorService.waitQueue.size());
+ assertEquals(1, taskExecutorService.completingFragmentMap.size());
+
+ r1.complete();
+ r1.awaitEnd();
+ // r2 can only start once 1 fragment has completed. the map should be clear at this point.
+ awaitStartAndSchedulerRun(r2, taskExecutorService);
+ assertEquals(0, taskExecutorService.completingFragmentMap.size());
+
+ } finally {
+ taskExecutorService.shutDown(false);
+ }
+ }
+
@Test(timeout = 10000)
public void testWaitQueuePreemption() throws InterruptedException {
MockRequest r1 = createMockRequest(1, 1, 100, 200, true, 20000l);
- MockRequest r2 = createMockRequest(2, 1, 200, 330, false, 20000l);
- MockRequest r3 = createMockRequest(3, 1, 300, 420, false, 20000l);
- MockRequest r4 = createMockRequest(4, 1, 400, 510, false, 20000l);
+ MockRequest r2 = createMockRequest(2, 1, 1,200, 330, false, 20000l);
+ MockRequest r3 = createMockRequest(3, 2, 2,300, 420, false, 20000l);
+ MockRequest r4 = createMockRequest(4, 1, 3,400, 510, false, 20000l);
MockRequest r5 = createMockRequest(5, 1, 500, 610, true, 20000l);
TaskExecutorServiceForTest taskExecutorService =
@@ -190,8 +244,7 @@ public class TestTaskExecutorService {
try {
taskExecutorService.schedule(r1);
- // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots)
- // This currently serves to allow the task to be removed from the waitQueue.
+ // 1 scheduling run will happen, which may or may not pick up this task in the test..
awaitStartAndSchedulerRun(r1, taskExecutorService);
Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2);
assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState);
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
index 8cce0cb..79c2564 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java
@@ -107,16 +107,16 @@ public class TestFirstInFirstOutComparator {
TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r4, queue.peek());
// this offer will be accepted and r1 evicted
- assertEquals(r1, queue.offer(r5));
+ assertEquals(r1, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r4, queue.take());
assertEquals(r3, queue.take());
@@ -129,16 +129,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r4, queue.peek());
// this offer will be accpeted and r1 evicted
- assertEquals(r1, queue.offer(r5));
+ assertEquals(r1, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r4, queue.take());
assertEquals(r3, queue.take());
@@ -151,16 +151,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r3, queue.peek());
// offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
+ assertEquals(r2, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r3, queue.take());
assertEquals(r1, queue.take());
@@ -173,16 +173,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r3, queue.peek());
// offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
+ assertEquals(r2, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r3, queue.take());
assertEquals(r1, queue.take());
@@ -195,16 +195,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// offer accepted and r2 gets evicted
- assertEquals(r2, queue.offer(r5));
+ assertEquals(r2, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r1, queue.take());
assertEquals(r4, queue.take());
@@ -217,16 +217,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r4, queue.peek());
// offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
+ assertEquals(r1, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r4, queue.take());
assertEquals(r3, queue.take());
@@ -239,16 +239,16 @@ public class TestFirstInFirstOutComparator {
r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r3, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r4, queue.peek());
// offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
+ assertEquals(r1, queue.offer(r5, 0));
assertEquals(r5, queue.take());
assertEquals(r4, queue.take());
assertEquals(r3, queue.take());
@@ -264,9 +264,9 @@ public class TestFirstInFirstOutComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -282,13 +282,13 @@ public class TestFirstInFirstOutComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 3);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
// can not queue more requests as queue is full
TaskWrapper r4 = createTaskWrapper(createRequest(4, 1, 0, 10, 100, 10), true, 100000);
- assertEquals(r4, queue.offer(r4));
+ assertEquals(r4, queue.offer(r4, 0));
}
@Test(timeout = 60000)
@@ -300,9 +300,9 @@ public class TestFirstInFirstOutComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new FirstInFirstOutComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
http://git-wip-us.apache.org/repos/asf/hive/blob/29e671ed/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index 0059d0c..b348bd6 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -35,16 +35,16 @@ public class TestShortestJobFirstComparator {
TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), false, 1000000);
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// this offer will be rejected
- assertEquals(r5, queue.offer(r5));
+ assertEquals(r5, queue.offer(r5, 0));
assertEquals(r1, queue.take());
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -57,16 +57,16 @@ public class TestShortestJobFirstComparator {
r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// this offer will be rejected
- assertEquals(r5, queue.offer(r5));
+ assertEquals(r5, queue.offer(r5, 0));
assertEquals(r1, queue.take());
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -79,16 +79,16 @@ public class TestShortestJobFirstComparator {
r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
+ assertEquals(r4, queue.offer(r5, 0));
assertEquals(r1, queue.take());
assertEquals(r3, queue.take());
assertEquals(r5, queue.take());
@@ -101,16 +101,16 @@ public class TestShortestJobFirstComparator {
r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
+ assertEquals(r4, queue.offer(r5, 0));
assertEquals(r1, queue.take());
assertEquals(r3, queue.take());
assertEquals(r5, queue.take());
@@ -123,16 +123,16 @@ public class TestShortestJobFirstComparator {
r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r1, queue.peek());
// offer accepted and r4 gets evicted
- assertEquals(r4, queue.offer(r5));
+ assertEquals(r4, queue.offer(r5, 0));
assertEquals(r1, queue.take());
assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
@@ -145,16 +145,16 @@ public class TestShortestJobFirstComparator {
r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500, 600, "q5"), true, 1000000);
queue = new EvictingPriorityBlockingQueue(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
+ assertNull(queue.offer(r1, 0));
assertEquals(r1, queue.peek());
- assertNull(queue.offer(r2));
+ assertNull(queue.offer(r2, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.peek());
- assertNull(queue.offer(r4));
+ assertNull(queue.offer(r4, 0));
assertEquals(r2, queue.peek());
// offer accepted, r1 evicted
- assertEquals(r1, queue.offer(r5));
+ assertEquals(r1, queue.offer(r5, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
@@ -170,9 +170,9 @@ public class TestShortestJobFirstComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -188,13 +188,13 @@ public class TestShortestJobFirstComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 3);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
// can not queue more requests as queue is full
TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 0, 10, 100, 10), true, 100000);
- assertEquals(r4, queue.offer(r4));
+ assertEquals(r4, queue.offer(r4, 0));
}
@Test(timeout = 60000)
@@ -206,9 +206,9 @@ public class TestShortestJobFirstComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -224,9 +224,9 @@ public class TestShortestJobFirstComparator {
EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>(
new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r1, queue.take());
assertEquals(r2, queue.take());
@@ -241,9 +241,9 @@ public class TestShortestJobFirstComparator {
queue = new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 4);
- assertNull(queue.offer(r1));
- assertNull(queue.offer(r2));
- assertNull(queue.offer(r3));
+ assertNull(queue.offer(r1, 0));
+ assertNull(queue.offer(r2, 0));
+ assertNull(queue.offer(r3, 0));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());