You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ss...@apache.org on 2016/04/29 11:47:03 UTC
hive git commit: HIVE-13469. LLAP: Support delayed scheduling for
locality. (Siddharth Seth, reviewed by Prasanth Jayachandran,
Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 076f3655b -> 347a5a558
HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/347a5a55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/347a5a55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/347a5a55
Branch: refs/heads/master
Commit: 347a5a5580742a36a875bd6a5f2ac8acd74d3cbf
Parents: 076f365
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Apr 29 15:14:27 2016 +0530
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Apr 29 15:14:27 2016 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/llap/tezplugins/ContainerFactory.java | 3 +-
.../tezplugins/LlapTaskSchedulerService.java | 377 ++++++++--
.../llap/tezplugins/helpers/MonotonicClock.java | 24 +
.../scheduler/LoggingFutureCallback.java | 44 ++
.../TestLlapTaskSchedulerService.java | 734 ++++++++++++++++++-
6 files changed, 1068 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 566e9b6..fd725cb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2762,7 +2762,7 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true),
"Amount of time to wait before allocating a request which contains location information," +
" to a location other than the ones requested. Set to -1 for an infinite delay, 0" +
- "for a no delay. Currently these are the only two supported values"
+ "for no delay."
),
LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS(
"hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300",
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
index a314391..f1feec7 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -37,11 +37,10 @@ class ContainerFactory {
}
public Container createContainer(Resource capability, Priority priority, String hostname,
- int port) {
+ int port, String nodeHttpAddress) {
ContainerId containerId =
ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, port);
- String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
Container container =
Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c3d3a1d..da1e17f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
@@ -62,6 +61,8 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.yarn.api.records.Container;
@@ -77,7 +78,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
@@ -91,6 +91,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
+ private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
private final Configuration conf;
// interface into the registry service
@@ -104,6 +106,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Tracks tasks which could not be allocated immediately.
@VisibleForTesting
+ // Tasks are tracked in the order requests come in, at different priority levels.
+ // TODO HIVE-13538 For tasks at the same priority level, it may be worth attempting to schedule tasks with
+ // locality information before those without locality information
final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() {
@Override
public int compare(Priority o1, Priority o2) {
@@ -113,23 +118,30 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Tracks running and queued tasks. Cleared after a task completes.
private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>();
+ // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started.
private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>();
- private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
// Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit.
@VisibleForTesting
final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
+ @VisibleForTesting
+ final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
- private final boolean forceLocation;
private final ContainerFactory containerFactory;
private final Random random = new Random();
- private final Clock clock;
+ @VisibleForTesting
+ final Clock clock;
private final ListeningExecutorService nodeEnabledExecutor;
private final NodeEnablerCallable nodeEnablerCallable =
new NodeEnablerCallable();
+ private final ListeningExecutorService delayedTaskSchedulerExecutor;
+ @VisibleForTesting
+ final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@@ -147,6 +159,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>();
private final NodeBlacklistConf nodeBlacklistConf;
+ private final LocalityDelayConf localityDelayConf;
// Per daemon
private final int memoryPerInstance;
@@ -168,6 +181,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final LlapRegistryService registry = new LlapRegistryService(false);
private volatile ListenableFuture<Void> nodeEnablerFuture;
+ private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
private volatile ListenableFuture<Void> schedulerFuture;
@VisibleForTesting
@@ -181,7 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final JvmPauseMonitor pauseMonitor;
public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
- this(taskSchedulerContext, new SystemClock(), true);
+ this(taskSchedulerContext, new MonotonicClock(), true);
}
@VisibleForTesting
@@ -189,6 +203,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean initMetrics) {
super(taskSchedulerContext);
this.clock = clock;
+ this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable();
try {
this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
} catch (IOException e) {
@@ -197,6 +212,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
+ // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
+ // publishing potentially different values when we support changing configurations dynamically.
+ // For now, this can simply be fetched from a single registry instance.
this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -212,11 +230,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
long localityDelayMs = HiveConf
.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS);
- if (localityDelayMs == -1) {
- this.forceLocation = true;
- } else {
- this.forceLocation = false;
- }
+
+ this.localityDelayConf = new LocalityDelayConf(localityDelayMs);
this.timeoutMonitor = new SchedulerTimeoutMonitor();
this.timeout = HiveConf.getTimeVar(conf,
@@ -240,6 +255,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
+ ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler")
+ .build());
+ delayedTaskSchedulerExecutor =
+ MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
+
ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
@@ -266,7 +287,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
+ ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
+ executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
+ ", nodeBlacklistConf=" + nodeBlacklistConf
- + ", forceLocation=" + forceLocation);
+ + ", localityDelayMs=" + localityDelayMs);
}
@Override
@@ -279,29 +300,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
writeLock.lock();
try {
nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
- Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- LOG.info("NodeEnabledThread exited");
- }
+ Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
+
+ delayedTaskSchedulerFuture =
+ delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
+ Futures.addCallback(delayedTaskSchedulerFuture,
+ new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("NodeEnabledThread exited with error", t);
- }
- });
schedulerFuture = schedulerExecutor.submit(schedulerCallable);
- Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- LOG.info("SchedulerThread exited");
- }
+ Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("SchedulerThread exited with error", t);
- }
- });
registry.start();
registry.registerStateChangeListener(new NodeStateChangeListener());
activeInstances = registry.getInstances();
@@ -399,6 +407,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
timeoutExecutor.shutdownNow();
+ delayedTaskSchedulerCallable.shutdown();
+ if (delayedTaskSchedulerFuture != null) {
+ delayedTaskSchedulerFuture.cancel(true);
+ }
+ delayedTaskSchedulerExecutor.shutdownNow();
+
schedulerCallable.shutdown();
if (schedulerFuture != null) {
schedulerFuture.cancel(true);
@@ -502,6 +516,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@Override
public void blacklistNode(NodeId nodeId) {
LOG.info("BlacklistNode not supported");
+ // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
}
@Override
@@ -513,7 +528,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
Priority priority, Object containerSignature, Object clientCookie) {
TaskInfo taskInfo =
- new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
+ new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime());
writeLock.lock();
try {
dagStats.registerTaskRequest(hosts, racks);
@@ -530,7 +545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Container affinity can be implemented as Host affinity for LLAP. Not required until
// 1:1 edges are used in Hive.
TaskInfo taskInfo =
- new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
+ new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime());
writeLock.lock();
try {
dagStats.registerTaskRequest(null, null);
@@ -558,7 +573,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return false;
}
if (taskInfo.containerId == null) {
- if (taskInfo.assigned) {
+ if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
LOG.error("Task: "
+ task
+ " assigned, but could not find the corresponding containerId."
@@ -577,7 +592,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
assert nodeInfo != null;
// Re-enable the node if preempted
- if (taskInfo.preempted) {
+ if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
nodeInfo.registerUnsuccessfulTaskEnd(true);
@@ -607,7 +622,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// In case of success, trigger a scheduling run for pending tasks.
trySchedulingPendingTasks();
- } else if (!taskSucceeded) {
+ } else { // Task Failed
nodeInfo.registerUnsuccessfulTaskEnd(false);
if (endReason != null && EnumSet
.of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
@@ -665,17 +680,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return true;
}
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
/**
* @param request the list of preferred hosts. null implies any host
* @return
*/
private SelectHostResult selectHost(TaskInfo request) {
String[] requestedHosts = request.requestedHosts;
+ long schedulerAttemptTime = clock.getTime();
readLock.lock(); // Read-lock. Not updating any stats at the moment.
try {
// If there's no memory available, fail
@@ -683,32 +694,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
}
+ boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
if (requestedHosts != null && requestedHosts.length > 0) {
int prefHostCount = -1;
- boolean requestedHostExists = false;
+ boolean requestedHostsWillBecomeAvailable = false;
for (String host : requestedHosts) {
prefHostCount++;
// Pick the first host always. Weak attempt at cache affinity.
Set<ServiceInstance> instances = activeInstances.getByHost(host);
if (!instances.isEmpty()) {
- requestedHostExists = true;
for (ServiceInstance inst : instances) {
NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
- if (nodeInfo != null && nodeInfo.canAcceptTask()) {
- LOG.info("Assigning " + inst + " when looking for " + host + "." +
- " FirstRequestedHost=" + (prefHostCount == 0) +
- (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : ""));
- return new SelectHostResult(inst, nodeInfo);
+ if (nodeInfo != null) {
+ if (nodeInfo.canAcceptTask()) {
+ // Successfully scheduled.
+ LOG.info(
+ "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+ ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
+ (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
+ ""));
+ return new SelectHostResult(inst, nodeInfo);
+ } else {
+ // The node cannot accept a task at the moment.
+ if (shouldDelayForLocality) {
+ // Perform some checks on whether the node will become available or not.
+ if (request.shouldForceLocality()) {
+ requestedHostsWillBecomeAvailable = true;
+ } else {
+ if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() &&
+ nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) {
+ // This node will likely be activated after the task timeout expires.
+ } else {
+ // Worth waiting for the timeout.
+ requestedHostsWillBecomeAvailable = true;
+ }
+ }
+ }
+ }
+ } else {
+ LOG.warn(
+ "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
+ inst.getWorkerIdentity(), host);
+ // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
+ // else ends up allocating to a random host immediately.
}
}
}
}
// Check if forcing the location is required.
- if (forceLocation) {
- if (requestedHostExists) {
+ if (shouldDelayForLocality) {
+ if (requestedHostsWillBecomeAvailable) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping non-local location allocation for [" + request.task +
- "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]");
+ "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" +
+ ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" +
+ request.getLocalityDelayTimeout());
}
return SELECT_HOST_RESULT_DELAYED_LOCALITY;
} else {
@@ -729,10 +769,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
for (int i = 0; i < all.length; i++) {
Entry<String, NodeInfo> inst = all[(i + n) % all.length];
if (inst.getValue().canAcceptTask()) {
- LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length +
- ", requestedHosts=" +
- ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
- Arrays.toString(requestedHosts)));
+ LOG.info(
+ "Assigning " + nodeToString(inst.getValue().getServiceInstance(), inst.getValue()) +
+ " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" +
+ ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+ Arrays.toString(requestedHosts)));
return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue());
}
}
@@ -820,6 +861,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
tasksAtPriority = new LinkedList<>();
pendingTasks.put(taskInfo.priority, tasksAtPriority);
}
+ // Delayed tasks will not kick in right now. That will happen in the scheduling loop.
tasksAtPriority.add(taskInfo);
knownTasks.putIfAbsent(taskInfo.task, taskInfo);
if (metrics != null) {
@@ -870,7 +912,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
try {
TaskInfo taskInfo = knownTasks.remove(task);
if (taskInfo != null) {
- if (taskInfo.assigned) {
+ if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
// Remove from the running list.
int priority = taskInfo.priority.getPriority();
Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
@@ -925,6 +967,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
taskInfo.triedAssigningTask();
ScheduleResult scheduleResult = scheduleTask(taskInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+ }
if (scheduleResult == ScheduleResult.SCHEDULED) {
taskIter.remove();
} else {
@@ -938,6 +983,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Preempt only if there's no pending preemptions to avoid preempting twice for a task.
String[] potentialHosts;
if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+
+ // Add the task to the delayed task queue if it does not already exist.
+ maybeAddToDelayedTaskQueue(taskInfo);
+
+ // Try preempting a lower priority task in any case.
// preempt only on specific hosts, if no preemptions already exist on those.
potentialHosts = taskInfo.requestedHosts;
//Protect against a bad location being requested.
@@ -1008,7 +1058,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
Container container =
containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
nsPair.getServiceInstance().getHost(),
- nsPair.getServiceInstance().getRpcPort());
+ nsPair.getServiceInstance().getRpcPort(),
+ nsPair.getServiceInstance().getServicesAddress());
writeLock.lock(); // While updating local structures
try {
LOG.info("Assigned task {} to container {}", taskInfo, container.getId());
@@ -1125,9 +1176,81 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
+ // There's no point adding a task with forceLocality set - since that will never exit the queue.
+ // Add other tasks if they are not already in the queue.
+ if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) {
+ taskInfo.setInDelayedQueue(true);
+ delayedTaskQueue.add(taskInfo);
+ }
+ }
+
+ private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+ return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
+ serviceInstance.getWorkerIdentity() + ", webAddress=" +
+ serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
+ }
+
+
+
+ // ------ Inner classes defined after this point ------
+
+ @VisibleForTesting
+ class DelayedTaskSchedulerCallable implements Callable<Void> {
+
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ @Override
+ public Void call() {
+ while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TaskInfo taskInfo = getNextTask();
+ taskInfo.setInDelayedQueue(false);
+ // Tasks can exist in the delayed queue even after they have been scheduled.
+ // Trigger scheduling only if the task is still in PENDING state.
+ processEvictedTask(taskInfo);
+
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info("DelayedTaskScheduler thread interrupted after shutdown");
+ break;
+ } else {
+ LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown");
+ throw new RuntimeException(
+ "DelayedTaskScheduler thread interrupted without being shutdown", e);
+ }
+ }
+ }
+ return null;
+ }
+
+ public void shutdown() {
+ isShutdown.set(true);
+ }
+
+ public TaskInfo getNextTask() throws InterruptedException {
+ return delayedTaskQueue.take();
+ }
+
+ public void processEvictedTask(TaskInfo taskInfo) {
+ if (shouldScheduleTask(taskInfo)) {
+ trySchedulingPendingTasks();
+ }
+ }
+
+ public boolean shouldScheduleTask(TaskInfo taskInfo) {
+ return taskInfo.getState() == TaskInfo.State.PENDING;
+ }
+ }
+
+ @VisibleForTesting
+ DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+ return new DelayedTaskSchedulerCallable();
+ }
+
private class NodeEnablerCallable implements Callable<Void> {
- private AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private static final long REFRESH_INTERVAL = 10000l;
long nextPollInterval = REFRESH_INTERVAL;
long lastRefreshTime;
@@ -1135,13 +1258,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@Override
public Void call() {
- lastRefreshTime = System.currentTimeMillis();
+ lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
try {
while (true) {
NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
if (nodeInfo != null) {
- long currentTime = System.currentTimeMillis();
+ long currentTime = LlapTaskSchedulerService.this.clock.getTime();
// A node became available. Enable the node and try scheduling.
reenableDisabledNode(nodeInfo);
trySchedulingPendingTasks();
@@ -1152,7 +1275,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (nextPollInterval < 0 || nodeInfo == null) {
// timeout expired. Reset the poll interval and refresh nodes.
nextPollInterval = REFRESH_INTERVAL;
- lastRefreshTime = System.currentTimeMillis();
+ lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
// TODO Get rid of this polling once we have notificaitons from the registry sub-system
if (LOG.isDebugEnabled()) {
LOG.debug("Refreshing instances based on poll interval");
@@ -1232,6 +1355,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// will be handled in the next run.
// A new request may come in right after this is set to false, but before the actual scheduling.
// This will be handled in this run, but will cause an immediate run after, which is harmless.
+ // This is mainly to handle a trySchedue request while in the middle of a run - since the event
+ // which triggered it may not be processed for all tasks in the run.
pendingScheduleInvocations.set(false);
// Schedule outside of the scheduleLock - which should only be used to wait on the condition.
schedulePendingTasks();
@@ -1245,6 +1370,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ // ------ Additional static classes defined after this point ------
+
@VisibleForTesting
static class NodeInfo implements Delayed {
private final NodeBlacklistConf blacklistConf;
@@ -1257,6 +1384,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
float cumulativeBackoffFactor = 1.0f;
// Indicates whether a node had a recent communication failure.
+ // This is primarily for tracking and logging purposes for the moment.
+ // TODO At some point, treat task rejection and communication failures differently.
private boolean hadCommFailure = false;
// Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc.
@@ -1375,6 +1504,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ /**
+ * @return the time at which this node will be re-enabled
+ */
+ public long getEnableTime() {
+ return expireTimeMillis;
+ }
+
public boolean isDisabled() {
return disabled;
}
@@ -1382,13 +1518,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public boolean hadCommFailure() {
return hadCommFailure;
}
+
/* Returning true does not guarantee that the task will run, considering other queries
may be running in the system. Also depends upon the capacity usage configuration
*/
public boolean canAcceptTask() {
boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
&&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
- LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+ serviceInstance.getWorkerIdentity() + "]: " +
+ "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
+ result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
+ serviceInstance.isAlive());
+ }
return result;
}
@@ -1512,11 +1655,23 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private static class TaskInfo {
+
+ // TODO There needs to be a mechanism to figure out different attempts for the same task. Delays
+ // could potentially be changed based on this.
+
+ @VisibleForTesting
+ static class TaskInfo implements Delayed {
+
+ enum State {
+ PENDING, ASSIGNED, PREEMPTED
+ }
+
// IDs used to ensure two TaskInfos are different without using the underlying task instance.
// Required for insertion into a TreeMap
static final AtomicLong ID_GEN = new AtomicLong(0);
final long uniqueId;
+ final LocalityDelayConf localityDelayConf;
+ final Clock clock;
final Object task;
final Object clientCookie;
final Priority priority;
@@ -1524,19 +1679,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
final String[] requestedHosts;
final String[] requestedRacks;
final long requestTime;
+ final long localityDelayTimeout;
long startTime;
long preemptTime;
ContainerId containerId;
ServiceInstance assignedInstance;
- private boolean assigned = false;
- private boolean preempted = false;
+ private State state = State.PENDING;
+ boolean inDelayedQueue = false;
private int numAssignAttempts = 0;
// TaskInfo instances for two different tasks will not be the same. Only a single instance should
// ever be created for a taskAttempt
- public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability,
+ public TaskInfo(LocalityDelayConf localityDelayConf, Clock clock, Object task, Object clientCookie, Priority priority, Resource capability,
String[] hosts, String[] racks, long requestTime) {
+ this.localityDelayConf = localityDelayConf;
+ this.clock = clock;
this.task = task;
this.clientCookie = clientCookie;
this.priority = priority;
@@ -1544,30 +1702,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
this.requestedHosts = hosts;
this.requestedRacks = racks;
this.requestTime = requestTime;
+ if (localityDelayConf.getNodeLocalityDelay() == -1) {
+ localityDelayTimeout = Long.MAX_VALUE;
+ } else if (localityDelayConf.getNodeLocalityDelay() == 0) {
+ localityDelayTimeout = 0L;
+ } else {
+ localityDelayTimeout = requestTime + localityDelayConf.getNodeLocalityDelay();
+ }
this.uniqueId = ID_GEN.getAndIncrement();
}
- void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
+ synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
this.assignedInstance = instance;
- this.containerId = containerId;
+ this.containerId = containerId;
this.startTime = startTime;
- assigned = true;
+ this.state = State.ASSIGNED;
}
- void setPreemptedInfo(long preemptTime) {
- this.preempted = true;
- this.assigned = false;
+ synchronized void setPreemptedInfo(long preemptTime) {
+ this.state = State.PREEMPTED;
this.preemptTime = preemptTime;
}
- void triedAssigningTask() {
+ synchronized void setInDelayedQueue(boolean val) {
+ this.inDelayedQueue = val;
+ }
+
+ synchronized void triedAssigningTask() {
numAssignAttempts++;
}
- int getNumPreviousAssignAttempts() {
+ synchronized int getNumPreviousAssignAttempts() {
return numAssignAttempts;
}
+ synchronized State getState() {
+ return state;
+ }
+
+ synchronized boolean isInDelayedQueue() {
+ return inDelayedQueue;
+ }
+
+ boolean shouldDelayForLocality(long schedulerAttemptTime) {
+ // getDelay <=0 means the task will be evicted from the queue.
+ return localityDelayTimeout > schedulerAttemptTime;
+ }
+
+ boolean shouldForceLocality() {
+ return localityDelayTimeout == Long.MAX_VALUE;
+ }
+
+ long getLocalityDelayTimeout() {
+ return localityDelayTimeout;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -1602,8 +1791,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
", containerId=" + containerId +
", assignedInstance=" + assignedInstance +
", uniqueId=" + uniqueId +
+ ", localityDelayTimeout=" + localityDelayTimeout +
'}';
}
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(localityDelayTimeout - clock.getTime(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ TaskInfo other = (TaskInfo) o;
+ if (other.localityDelayTimeout > this.localityDelayTimeout) {
+ return -1;
+ } else if (other.localityDelayTimeout < this.localityDelayTimeout) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
}
// Newer tasks first.
@@ -1689,4 +1896,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
'}';
}
}
+
+ @VisibleForTesting
+ static final class LocalityDelayConf {
+ private final long nodeLocalityDelay;
+
+ public LocalityDelayConf(long nodeLocalityDelay) {
+ this.nodeLocalityDelay = nodeLocalityDelay;
+ }
+
+ public long getNodeLocalityDelay() {
+ return nodeLocalityDelay;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalityDelayConf{" +
+ "nodeLocalityDelay=" + nodeLocalityDelay +
+ '}';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
new file mode 100644
index 0000000..aaa6f91
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MonotonicClock implements Clock {
+ @Override
+ public long getTime() {
+ return Time.monotonicNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
new file mode 100644
index 0000000..ea700da
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.scheduler;
+
+import java.util.concurrent.CancellationException;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.slf4j.Logger;
+
+public final class LoggingFutureCallback implements FutureCallback<Void> {
+ private final String componentName;
+ private final Logger LOG;
+
+ public LoggingFutureCallback(String componentName, Logger log) {
+ this.componentName = componentName;
+ LOG = log;
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.info("{} exited", componentName);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof CancellationException) {
+ LOG.info("{} was cancelled", componentName, t.getMessage());
+ } else {
+ LOG.warn("{} exited with error", componentName, t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index b2cd55e..e4fe79c 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -15,7 +15,9 @@
package org.apache.hadoop.hive.llap.tezplugins;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
@@ -26,6 +28,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +40,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -44,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -62,7 +66,7 @@ public class TestLlapTaskSchedulerService {
private static final String HOST2 = "host2";
private static final String HOST3 = "host3";
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testSimpleLocalAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -77,18 +81,17 @@ public class TestLlapTaskSchedulerService {
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
+ tsWrapper.awaitLocalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
- // TODO Verify this is on host1.
assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
} finally {
tsWrapper.shutdown();
}
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -99,8 +102,7 @@ public class TestLlapTaskSchedulerService {
Object clientCookie1 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
+ tsWrapper.awaitTotalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
} finally {
@@ -109,7 +111,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testPreemption() throws InterruptedException, IOException {
Priority priority1 = Priority.newInstance(1);
@@ -174,7 +176,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testNodeDisabled() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
try {
@@ -233,7 +235,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testNodeReEnabled() throws InterruptedException, IOException {
// Based on actual timing.
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
@@ -307,7 +309,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testForceLocalityTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
@@ -316,7 +318,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
@@ -411,7 +413,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test(timeout = 5000)
+ @Test(timeout = 10000)
public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
@@ -454,15 +456,13 @@ public class TestLlapTaskSchedulerService {
}
}
-
- @Test(timeout = 5000)
+ @Test(timeout = 10000)
public void testForcedLocalityPreemption() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
- String [] hostsH2 = new String[] {HOST2};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
@@ -485,13 +485,7 @@ public class TestLlapTaskSchedulerService {
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
// This request at a lower priority should not affect anything.
tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
- while (true) {
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
- if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
- break;
- }
- }
+ tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
@@ -517,13 +511,8 @@ public class TestLlapTaskSchedulerService {
tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
- while (true) {
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
- if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
- break;
- }
- }
+ tsWrapper.awaitLocalTaskAllocations(3);
+
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
@@ -532,11 +521,471 @@ public class TestLlapTaskSchedulerService {
}
}
+ @Test(timeout = 10000)
+ public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+ String[] hosts = new String[]{HOST1, HOST2};
+
+ String[] hostsH1 = new String[]{HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+ testNotInQueue(tsWrapper, hostsH1);
+ }
+
+ @Test(timeout = 10000)
+ public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+ String[] hosts = new String[]{HOST1};
+
+ String[] hostsH1 = new String[]{HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l);
+ testNotInQueue(tsWrapper, hostsH1);
+ }
+
+ private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws
+ InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ try {
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(hosts, priority1);
+ tsWrapper.allocateTask(hosts, priority1);
+ tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ assertEquals(0, tsWrapper.ts.delayedTaskQueue.size());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // No capacity left on node1. The next task should be allocated to node2 after it times out.
+ clock.setTime(clock.getTime() + 10000l); // Past the timeout.
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+ delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Move the clock forward 2000ms, and check the delayed queue
+ clock.setTime(clock.getTime() + 2000l); // Past the timeout.
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+ tsWrapper.deallocateTask(task1, true, null);
+
+ // Node1 now has free capacity. task1 should be allocated to it.
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(3, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException,
+ InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ // Simulate a 2s delay before finishing the task.
+ clock.setTime(clock.getTime() + 2000);
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet.
+ tsWrapper.deallocateTask(task1, true, null);
+ tsWrapper.awaitLocalTaskAllocations(3);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Move the clock forward and trigger a run.
+ clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+ delayedTaskSchedulerCallableControlled.lastState);
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+ !delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+ // Ensure there's no more invocations.
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testTaskInfoDelay() {
+
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 =
+ new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
+ clock.setTime(clock.getTime());
+
+
+ // With a timeout of 3000.
+ LlapTaskSchedulerService.TaskInfo taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ assertFalse(taskInfo.shouldForceLocality());
+
+ assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+ clock.setTime(clock.getTime() + 500);
+ assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+ clock.setTime(clock.getTime() + 2500);
+ assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+
+ // No locality delay
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 =
+ new LlapTaskSchedulerService.LocalityDelayConf(0);
+ taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+ assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+ assertFalse(taskInfo.shouldForceLocality());
+ assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+
+ // Force locality
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 =
+ new LlapTaskSchedulerService.LocalityDelayConf(-1);
+ taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+ assertTrue(taskInfo.shouldForceLocality());
+ assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+ }
+
+ @Test(timeout = 10000)
+ public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException {
+
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf =
+ new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
+ clock.setTime(clock.getTime());
+
+ DelayQueue<LlapTaskSchedulerService.TaskInfo> delayedQueue = new DelayQueue<>();
+
+ LlapTaskSchedulerService.TaskInfo taskInfo1 =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ clock.setTime(clock.getTime() + 1000);
+ LlapTaskSchedulerService.TaskInfo taskInfo2 =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ delayedQueue.add(taskInfo1);
+ delayedQueue.add(taskInfo2);
+
+ assertEquals(taskInfo1, delayedQueue.peek());
+ }
+
+ @Test (timeout = 15000)
+ public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ // Node disable timeout higher than locality delay.
+ TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 10000l);
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ long startTime = tsWrapper.getClock().getTime();
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Mark a task as failed due to a comm failure.
+ tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+ // Node1 marked as failed, node2 has capacity.
+ // Timeout for nodes is larger than delay - immediate allocation
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ long thirdAllocateTime = tsWrapper.getClock().getTime();
+ long diff = thirdAllocateTime - startTime;
+ // diffAfterSleep < total sleepTime
+ assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 10000l);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test (timeout = 15000)
+ public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Mark a task as failed due to a comm failure.
+ tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+ // Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout
+ tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l);
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
private static class TestTaskSchedulerServiceWrapper {
static final Resource resource = Resource.newInstance(1024, 1);
Configuration conf;
TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
- ControlledClock clock = new ControlledClock(new SystemClock());
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
LlapTaskSchedulerServiceForTest ts;
@@ -555,14 +1004,21 @@ public class TestLlapTaskSchedulerService {
this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
}
- TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws
+ TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+ int waitQueueSize, long localityDelayMs) throws
+ IOException, InterruptedException {
+ this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false);
+ }
+
+ TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+ int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
IOException, InterruptedException {
conf = new Configuration();
conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
- disableTimeoutMillis + "ms");
+ nodeDisableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
@@ -571,7 +1027,11 @@ public class TestLlapTaskSchedulerService {
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
- ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+ if (controlledDelayedTaskQueue) {
+ ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
+ } else {
+ ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+ }
controlScheduler(true);
ts.initialize();
@@ -582,6 +1042,10 @@ public class TestLlapTaskSchedulerService {
awaitSchedulerRun();
}
+ ControlledClock getClock() {
+ return clock;
+ }
+
void controlScheduler(boolean val) {
ts.forTestsetControlScheduling(val);
}
@@ -591,8 +1055,19 @@ public class TestLlapTaskSchedulerService {
}
void awaitSchedulerRun() throws InterruptedException {
- ts.forTestAwaitSchedulingRun();
+ ts.forTestAwaitSchedulingRun(-1);
+ }
+
+ /**
+ *
+ * @param timeoutMs
+ * @return false if the time elapsed
+ * @throws InterruptedException
+ */
+ boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException {
+ return ts.forTestAwaitSchedulingRun(timeoutMs);
}
+
void resetAppCallback() {
reset(mockAppCallback);
}
@@ -605,6 +1080,8 @@ public class TestLlapTaskSchedulerService {
ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie);
}
+
+
void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) {
ts.deallocateTask(task, succeeded, endReason, null);
}
@@ -612,6 +1089,60 @@ public class TestLlapTaskSchedulerService {
void rejectExecution(Object task) {
ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
}
+
+
+ // More complex methods which may wrap multiple operations
+ Object allocateTask(String[] hosts, Priority priority) {
+ Object task = new Object();
+ Object clientCookie = new Object();
+ allocateTask(task, hosts, priority, clientCookie);
+ return task;
+ }
+
+ public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numTotalAllocations == numTasks) {
+ break;
+ }
+ }
+ }
+
+ public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numLocalAllocations == numTasks) {
+ break;
+ }
+ }
+ }
+
+ public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numTotalAllocations > previousAllocations) {
+ break;
+ }
+ Thread.sleep(200l);
+ }
+ }
+
+ public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws
+ InterruptedException {
+ long startTime = Time.monotonicNow();
+ long timeLeft = timeout;
+ while (timeLeft > 0) {
+ signalSchedulerRun();
+ awaitSchedulerRun(Math.min(200, timeLeft));
+ if (ts.dagStats.numTotalAllocations != previousAllocations) {
+ throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.numTotalAllocations);
+ }
+ timeLeft = (startTime + timeout) - Time.monotonicNow();
+ }
+ }
}
private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService {
@@ -632,6 +1163,7 @@ public class TestLlapTaskSchedulerService {
@Override
protected void schedulePendingTasks() {
+ LOG.info("Attempted schedulPendingTasks");
testLock.lock();
try {
if (controlScheduling.get()) {
@@ -668,17 +1200,143 @@ public class TestLlapTaskSchedulerService {
}
}
- void forTestAwaitSchedulingRun() throws InterruptedException {
+ boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException {
testLock.lock();
try {
+ boolean success = true;
while (!schedulingComplete) {
- schedulingCompleteCondition.await();
+ if (timeout == -1) {
+ schedulingCompleteCondition.await();
+ } else {
+ success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS);
+ break;
+ }
}
schedulingComplete = false;
+ return success;
} finally {
testLock.unlock();
}
}
}
+
+ private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest {
+
+ private DelayedTaskSchedulerCallableControlled controlledTSCallable;
+
+ public LlapTaskSchedulerServiceForTestControlled(
+ TaskSchedulerContext appClient, Clock clock) {
+ super(appClient, clock);
+ }
+
+ @Override
+ LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+ controlledTSCallable = new DelayedTaskSchedulerCallableControlled();
+ return controlledTSCallable;
+ }
+
+ class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable {
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition triggerRunCondition = lock.newCondition();
+ private boolean shouldRun = false;
+ private final Condition runCompleteCondition = lock.newCondition();
+ private boolean runComplete = false;
+
+ static final int STATE_NOT_RUN = 0;
+ static final int STATE_NULL_FOUND = 1;
+ static final int STATE_TIMEOUT_NOT_EXPIRED = 2;
+ static final int STATE_RETURNED_TASK = 3;
+
+ volatile int lastState = STATE_NOT_RUN;
+
+ volatile boolean lastShouldScheduleTaskResult = false;
+ volatile boolean shouldScheduleTaskTriggered = false;
+
+ @Override
+ public void processEvictedTask(TaskInfo taskInfo) {
+ super.processEvictedTask(taskInfo);
+ signalRunComplete();
+ }
+
+ @Override
+ public TaskInfo getNextTask() throws InterruptedException {
+
+ while (true) {
+ lock.lock();
+ try {
+ while (!shouldRun) {
+ triggerRunCondition.await();
+ }
+ // Preven subsequent runs until a new trigger is set.
+ shouldRun = false;
+ } finally {
+ lock.unlock();
+ }
+ TaskInfo taskInfo = delayedTaskQueue.peek();
+ if (taskInfo == null) {
+ LOG.info("Triggered getTask but the queue is empty");
+ lastState = STATE_NULL_FOUND;
+ signalRunComplete();
+ continue;
+ }
+ if (taskInfo.shouldDelayForLocality(
+ LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) {
+ LOG.info("Triggered getTask but the first element is not ready to execute");
+ lastState = STATE_TIMEOUT_NOT_EXPIRED;
+ signalRunComplete();
+ continue;
+ } else {
+ delayedTaskQueue.poll(); // Remove the previously peeked element.
+ lastState = STATE_RETURNED_TASK;
+ return taskInfo;
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldScheduleTask(TaskInfo taskInfo) {
+ shouldScheduleTaskTriggered = true;
+ lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo);
+ return lastShouldScheduleTaskResult;
+ }
+
+ void resetShouldScheduleInformation() {
+ shouldScheduleTaskTriggered = false;
+ lastShouldScheduleTaskResult = false;
+ }
+
+ private void signalRunComplete() {
+ lock.lock();
+ try {
+ runComplete = true;
+ runCompleteCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void triggerGetNextTask() {
+ lock.lock();
+ try {
+ shouldRun = true;
+ triggerRunCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void awaitGetNextTaskProcessing() throws InterruptedException {
+ lock.lock();
+ try {
+ while (!runComplete) {
+ runCompleteCondition.await();
+ }
+ runComplete = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
}