You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by da...@apache.org on 2016/07/14 19:48:01 UTC
incubator-myriad git commit: MYRIAD-220 Initial check-in Encapsulates
changes to implement MYRIAD-220 along with enhanced/added comments JIRA:
[MYRIAD-220] https://issues.apache.org/jira/browse/MYRIAD-220 Pull Request:
Closes #84 Author: hokiegeek
Repository: incubator-myriad
Updated Branches:
refs/heads/master 4a6e50c41 -> 7207e2b04
MYRIAD-220 Initial check-in
Encapsulates changes to implement MYRIAD-220 along with enhanced/added comments
JIRA:
[MYRIAD-220] https://issues.apache.org/jira/browse/MYRIAD-220
Pull Request:
Closes #84
Author: hokiegeek2 <ho...@gmail.com>
Date: Thu Jun 30 16:03:44 2016 -0400
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/7207e2b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/7207e2b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/7207e2b0
Branch: refs/heads/master
Commit: 7207e2b04d8c9a0d74376cdeca8216fd237c960c
Parents: 4a6e50c
Author: hokiegeek2 <ho...@gmail.com>
Authored: Thu Jun 30 16:03:44 2016 -0400
Committer: darinj <da...@apache.org>
Committed: Thu Jul 14 15:03:10 2016 -0400
----------------------------------------------------------------------
.../recovery/MyriadFileSystemRMStateStore.java | 3 +-
.../apache/myriad/scheduler/MyriadDriver.java | 51 ++++++++++-
.../myriad/scheduler/MyriadScheduler.java | 40 ++++++++-
.../apache/myriad/scheduler/TaskTerminator.java | 92 ++++++++++++++------
.../org/apache/myriad/scheduler/TaskUtils.java | 3 -
.../handlers/ResourceOffersEventHandler.java | 6 +-
.../handlers/StatusUpdateEventHandler.java | 58 +++++++++---
.../scheduler/fgs/YarnNodeCapacityManager.java | 38 ++++++--
8 files changed, 231 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
index 6257ffc..99078c0 100644
--- a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
+++ b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -101,7 +102,7 @@ public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore impleme
@Override
public synchronized StoreContext loadMyriadState() throws Exception {
StoreContext sc = null;
- if (myriadStateBytes != null && myriadStateBytes.length > 0) {
+ if (ArrayUtils.isNotEmpty(myriadStateBytes)) {
sc = StoreContext.fromSerializedBytes(myriadStateBytes);
myriadStateBytes = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
index 014516d..31656fb 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java
@@ -26,7 +26,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Driver for Myriad scheduler.
+ * The MyriadDriver class is a wrapper for the Mesos SchedulerDriver class. Accordingly,
+ * all public MyriadDriver methods delegate to the corresponding SchedulerDriver methods.
*/
public class MyriadDriver {
private static final Logger LOGGER = LoggerFactory.getLogger(MyriadDriver.class);
@@ -38,6 +39,19 @@ public class MyriadDriver {
this.driver = driver;
}
+ /**
+ * Stops the underlying Mesos SchedulerDriver. If the failover flag is set to
+ * false, Myriad will not reconnect to Mesos. Consequently, Mesos will unregister
+ * the Myriad framework and shutdown all the Myriad tasks and executors. If failover
+ * is set to true, all Myriad executors and tasks will remain running for a defined
+ * period of time, allowing the MyriadScheduler to reconnect to Mesos.
+ *
+ * @param failover Whether framework failover is expected.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
public Status stop(boolean failover) {
LOGGER.info("Stopping driver");
Status status = driver.stop(failover);
@@ -45,6 +59,14 @@ public class MyriadDriver {
return status;
}
+ /**
+ * Starts the underlying Mesos SchedulerDriver. Note: this method must
+ * be called before any other MyriadDriver methods are invoked.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
public Status start() {
LOGGER.info("Starting driver");
Status status = driver.start();
@@ -52,16 +74,41 @@ public class MyriadDriver {
return status;
}
+ /**
+ * Kills the specified task via the underlying Mesos SchedulerDriver.
+ * Important note from the Mesos documentation: "attempting to kill a
+ * task is currently not reliable. If, for example, a scheduler fails over
+ * while it was attempting to kill a task it will need to retry in
+ * the future Likewise, if unregistered / disconnected, the request
+ * will be dropped (these semantics may be changed in the future)."
+ *
+ * @param taskId The ID of the task to be killed.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
public Status kill(final TaskID taskId) {
Status status = driver.killTask(taskId);
LOGGER.info("Task {} killed with status: {}", taskId, status);
return status;
}
+ /**
+ * Aborts the underlying Mesos SchedulerDriver so that no more callbacks
+ * can be made to the MyriadScheduler. Note from Mesos documentation:
+ * The semantics of abort and stop have deliberately been separated so that
+ * code can detect an aborted driver and instantiate and start another driver
+ * if desired (from within the same process).
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
public Status abort() {
LOGGER.info("Aborting driver");
Status status = driver.abort();
- LOGGER.info("Driver aborted with status: {}", status);
+ LOGGER.info("Aborted driver with status: {}", status);
return status;
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
index cb850ab..561d36e 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java
@@ -18,9 +18,10 @@
*/
package org.apache.myriad.scheduler;
-import com.lmax.disruptor.EventTranslator;
import java.util.List;
+
import javax.inject.Inject;
+
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
@@ -36,8 +37,11 @@ import org.apache.myriad.scheduler.event.ResourceOffersEvent;
import org.apache.myriad.scheduler.event.SlaveLostEvent;
import org.apache.myriad.scheduler.event.StatusUpdateEvent;
+import com.lmax.disruptor.EventTranslator;
+
/**
- * Myriad Scheduler
+ * The Myriad implementation of the Mesos Scheduler callback interface, where the method implementations
+ * publish Myriad framework events corresponding to the Mesos callbacks.
*/
public class MyriadScheduler implements Scheduler {
private org.apache.myriad.DisruptorManager disruptorManager;
@@ -47,6 +51,9 @@ public class MyriadScheduler implements Scheduler {
this.disruptorManager = disruptorManager;
}
+ /**
+ * Publishes a RegisteredEvent
+ */
@Override
public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) {
disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() {
@@ -59,6 +66,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes a ReRegisteredEvent
+ */
@Override
public void reregistered(final SchedulerDriver driver, final Protos.MasterInfo masterInfo) {
disruptorManager.getReRegisteredEventDisruptor().publishEvent(new EventTranslator<ReRegisteredEvent>() {
@@ -70,6 +80,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes a ResourceOffersEvent
+ */
@Override
public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) {
disruptorManager.getResourceOffersEventDisruptor().publishEvent(new EventTranslator<ResourceOffersEvent>() {
@@ -81,6 +94,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes a OfferRescindedEvent
+ */
@Override
public void offerRescinded(final SchedulerDriver driver, final Protos.OfferID offerId) {
disruptorManager.getOfferRescindedEventDisruptor().publishEvent(new EventTranslator<OfferRescindedEvent>() {
@@ -92,6 +108,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes a StatusUpdateEvent
+ */
@Override
public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus status) {
disruptorManager.getStatusUpdateEventDisruptor().publishEvent(new EventTranslator<StatusUpdateEvent>() {
@@ -103,6 +122,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes FrameworkMessageEvent
+ */
@Override
public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
final byte[] bytes) {
@@ -117,6 +139,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes DisconnectedEvent
+ */
@Override
public void disconnected(final SchedulerDriver driver) {
disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() {
@@ -127,6 +152,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes SlaveLostEvent
+ */
@Override
public void slaveLost(final SchedulerDriver driver, final Protos.SlaveID slaveId) {
disruptorManager.getSlaveLostEventDisruptor().publishEvent(new EventTranslator<SlaveLostEvent>() {
@@ -138,6 +166,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes ExecutorLostEvent
+ */
@Override
public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId,
final int exitStatus) {
@@ -152,6 +183,9 @@ public class MyriadScheduler implements Scheduler {
});
}
+ /**
+ * Publishes ErrorEvent
+ */
@Override
public void error(final SchedulerDriver driver, final String message) {
disruptorManager.getErrorEventDisruptor().publishEvent(new EventTranslator<ErrorEvent>() {
@@ -162,4 +196,4 @@ public class MyriadScheduler implements Scheduler {
}
});
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
index 6be653b..4110b37 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java
@@ -18,10 +18,10 @@
*/
package org.apache.myriad.scheduler;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import java.util.Set;
+
import javax.inject.Inject;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskID;
@@ -31,8 +31,12 @@ import org.apache.myriad.state.SchedulerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
- * {@link TaskTerminator} is responsible for killing tasks.
+ * {@link TaskTerminator} is basically a reaper process responsible for killing
+ * tasks marked as Killable by {@link MyriadOperations} that are stored
+ * within a {@link SchedulerState} object
*/
public class TaskTerminator implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskTerminator.class);
@@ -49,36 +53,68 @@ public class TaskTerminator implements Runnable {
this.offerLifeCycleManager = offerLifecycleManager;
}
+ /**
+ * Encapsulates logic that retrieves the collection of killable tasks from the
+ * SchedulerState object. If a task is in pending state, the task is simply
+ * removed from SchedulerState. Any tasks in a running state were not successfully
+ * killed by Mesos or the callback failed, so the another kill attempt is made.
+ */
@Override
- public void run() {
- // clone a copy of the killable tasks
- Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks());
-
- if (CollectionUtils.isEmpty(killableTasks)) {
- return;
- }
+ public void run() {
+ //If there are 1..n killable tasks, proceed; otherwise, simply return
+ if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) {
+ /*
+ * Clone the killable task collection, iterate through all tasks, and
+ * process any pending and/or non-pending tasks
+ */
+ Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks());
+ Status driverStatus = driverManager.getDriverStatus();
- Status driverStatus = driverManager.getDriverStatus();
- if (Status.DRIVER_RUNNING != driverStatus) {
- LOGGER.warn("Cannot kill tasks, as driver is not running. Status: {}", driverStatus);
- return;
- }
+ //TODO (hokiegeek2) Can the DriverManager be restarted? If not, should the ResourceManager stop?
+ if (Status.DRIVER_RUNNING != driverStatus) {
+ LOGGER.warn("Cannot kill tasks because Mesos Driver is not running. Status: {}", driverStatus);
+ return;
+ }
- for (TaskID taskIdToKill : killableTasks) {
- if (this.schedulerState.getPendingTaskIds().contains(taskIdToKill)) {
- this.schedulerState.removeTask(taskIdToKill);
- } else {
- Status status = this.driverManager.kill(taskIdToKill);
- NodeTask task = schedulerState.getTask(taskIdToKill);
- if (task != null) {
- offerLifeCycleManager.declineOutstandingOffers(task.getHostname());
- this.schedulerState.removeTask(taskIdToKill);
+ for (TaskID taskIdToKill : killableTasks) {
+ LOGGER.info("Received task kill request for task: {}", taskIdToKill);
+ if (isPendingTask(taskIdToKill)) {
+ handlePendingTask(taskIdToKill);
} else {
- schedulerState.removeTask(taskIdToKill);
- LOGGER.warn("NodeTask with taskId: {} does not exist", taskIdToKill);
+ handleNonPendingTask(taskIdToKill);
}
- Preconditions.checkState(status == Status.DRIVER_RUNNING);
}
}
}
-}
+
+ private void handlePendingTask(TaskID taskId) {
+ /*
+ * since task is pending and has not started, simply remove
+ * it from SchedulerState task collection
+ */
+ schedulerState.removeTask(taskId);
+ }
+
+ private void handleNonPendingTask(TaskID taskId) {
+ /*
+ * Kill the task and decline additional offers for it, but hold off removing from SchedulerState.
+ * Removal of the killable task must be done following invocation of statusUpdate callback method
+ * which constitutes acknowledgement from Mesos that the kill task request succeeded.
+ */
+ Status status = this.driverManager.kill(taskId);
+ NodeTask task = schedulerState.getTask(taskId);
+
+ if (task != null) {
+ offerLifeCycleManager.declineOutstandingOffers(task.getHostname());
+ }
+ if (status.equals(Status.DRIVER_RUNNING)) {
+ LOGGER.info("Kill request for {} was submitted to a running SchedulerDriver", taskId);
+ } else {
+ LOGGER.warn("Kill task request for {} submitted to non-running SchedulerDriver, may fail", taskId);
+ }
+ }
+
+ private boolean isPendingTask(TaskID taskId) {
+ return this.schedulerState.getPendingTaskIds().contains(taskId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index d73a467..c8e2a21 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -75,9 +75,6 @@ public class TaskUtils {
private static final String CONTAINER_PATH_KEY = "containerPath";
private static final String HOST_PATH_KEY = "hostPath";
private static final String RW_MODE = "mode";
- private static final String CONTAINER_PORT_KEY = "containerPort";
- private static final String HOST_PORT_KEY = "hostPort";
- private static final String PROTOCOL_KEY = "protocol";
private static final String PARAMETER_KEY_KEY = "key";
private static final String PARAMETER_VALUE_KEY = "value";
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 8d1cd03..f0e80e9 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -97,7 +97,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
}
return;
}
- LOGGER.info("Received offers {}", offers.size());
+ LOGGER.debug("Received offers {}", offers.size());
LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
driverOperationLock.lock();
try {
@@ -218,7 +218,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) {
return true;
} else {
- LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports);
+ LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports);
return false;
}
}
@@ -243,7 +243,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
private void checkResource(boolean fail, String resource) {
if (fail) {
- LOGGER.info("No " + resource + " resources present");
+ LOGGER.debug("No " + resource + " resources present");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
index 25d0440..079df4b 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -31,7 +31,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * handles and logs mesos status update events
+ * Handles and logs mesos StatusUpdateEvents based upon the corresponding
+ * Protos.TaskState enum value
*/
public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> {
@@ -45,7 +46,21 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent>
this.schedulerState = schedulerState;
this.offerLifecycleManager = offerLifecycleManager;
}
-
+
+ /**
+ * Encapsulates the logic to log and respond to the incoming StatusUpdateEvent per the
+ * Event TaskStatus state:
+ *
+ * 1. TASK_STAGING: mark task as staging wtihin SchedulerState
+ * 2. TASK_STARTING: mark task as staging within SchedulerState
+ * 3. TASK_RUNNING: mark task as active within SchedulerState
+ * 4. TASK_FINISHED: decline outstanding offers and remove task from SchedulerState
+ * 5. TASK_FAILED: decline outstanding offers, remove failed, killable tasks from SchedulerState,
+ * mark as pending non-killable, failed tasks
+ * 6. TASK_KILLED: decline outstanding offers, removed killed tasks from SchedulerState
+ * 7. TASK_LOST: decline outstanding offers, remove killable, lost tasks from SchedulerState,
+ * mark as pending non-killable, lost tasks
+ */
@Override
public void onEvent(StatusUpdateEvent event, long sequence, boolean endOfBatch) throws Exception {
TaskStatus status = event.getStatus();
@@ -71,25 +86,44 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent>
schedulerState.makeTaskActive(taskId);
break;
case TASK_FINISHED:
- offerLifecycleManager.declineOutstandingOffers(task.getHostname());
- schedulerState.removeTask(taskId);
+ cleanupTask(taskId, task, "finished");
break;
case TASK_FAILED:
- // Add to pending tasks
- offerLifecycleManager.declineOutstandingOffers(task.getHostname());
- schedulerState.makeTaskPending(taskId);
+ cleanupFailedTask(taskId, task, "failed");
break;
case TASK_KILLED:
- offerLifecycleManager.declineOutstandingOffers(task.getHostname());
- schedulerState.removeTask(taskId);
+ cleanupTask(taskId, task, "killed");
break;
case TASK_LOST:
- offerLifecycleManager.declineOutstandingOffers(task.getHostname());
- schedulerState.makeTaskPending(taskId);
+ cleanupFailedTask(taskId, task, "lost");
break;
default:
LOGGER.error("Invalid state: {}", state);
break;
}
}
-}
+
+ private void cleanupFailedTask(TaskID taskId, NodeTask task, String stopReason) {
+ offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+ /*
+ * Remove the task from SchedulerState if the task is killable. Otherwise,
+ * mark the task as pending to enable restart.
+ */
+ if (taskIsKillable(taskId)) {
+ schedulerState.removeTask(taskId);
+ LOGGER.info("Removed killable, {} task with id {}", stopReason, taskId);
+ } else {
+ schedulerState.makeTaskPending(taskId);
+ LOGGER.info("Marked as pending {} task with id {}", stopReason, taskId);
+ }
+ }
+
+ private void cleanupTask(TaskID taskId, NodeTask task, String stopReason) {
+ offerLifecycleManager.declineOutstandingOffers(task.getHostname());
+ schedulerState.removeTask(taskId);
+ LOGGER.info("Removed {} task with id {}", stopReason, taskId);
+ }
+ private boolean taskIsKillable(TaskID taskId) {
+ return schedulerState.getKillableTasks().contains(taskId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/7207e2b0/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index e922fc6..8f7c6f5 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -127,13 +127,15 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
}
private void removeYarnTask(RMContainer rmContainer) {
- if (rmContainer != null && rmContainer.getContainer() != null) {
+ if (containersNotNull(rmContainer)){
Protos.TaskID taskId = containerToTaskId(rmContainer);
- //TODO (darinj) Reliable messaging
+ /*
+ * Mark the task as killable within the ServerState object to flag the task
+ * for the TaskTerminator daemon to kill the task
+ */
state.makeTaskKillable(taskId);
- myriadDriver.kill(taskId);
- String hostname = rmContainer.getContainer().getNodeId().getHost();
- Node node = nodeStore.getNode(hostname);
+
+ Node node = retrieveNode(rmContainer);
if (node != null) {
RMNode rmNode = node.getNode().getRMNode();
Resource resource = rmContainer.getContainer().getResource();
@@ -141,11 +143,20 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(),
rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory());
} else {
- LOGGER.warn(hostname + " not found");
+ LOGGER.warn("The Node for the {} host was not found", rmContainer.getContainer().getNodeId().getHost());
}
}
}
+ private Node retrieveNode(RMContainer container) {
+ String hostname = container.getContainer().getNodeId().getHost();
+ return nodeStore.getNode(hostname);
+ }
+
+ private boolean containersNotNull(RMContainer rmContainer) {
+ return (rmContainer != null && rmContainer.getContainer() != null);
+ }
+
@Override
public void afterSchedulerEventHandled(SchedulerEvent event) {
switch (event.getType()) {
@@ -182,7 +193,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
/**
* Checks if any containers were allocated in the current scheduler run and
- * launches the corresponding Mesos tasks. It also udpates the node
+ * launches the corresponding Mesos tasks. It also updates the node
* capacity depending on what portion of the consumed offers were actually
* used.
*/
@@ -232,11 +243,22 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
node.removeContainerSnapshot();
}
-
+ /**
+ * Increments the capacity for the specified RMNode
+ *
+ * @param rmNode
+ * @param removedCapacity
+ */
public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity));
}
+ /**
+ * Decrements the capacity for the specified RMNode
+ *
+ * @param rmNode
+ * @param removedCapacity
+ */
public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity));
}