You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ji...@apache.org on 2012/09/14 08:24:50 UTC
svn commit: r1384653 - in
/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager:
ActionDBAccessor.java ActionManager.java ActionScheduler.java
HostAction.java HostRoleCommand.java Stage.java
Author: jitendra
Date: Fri Sep 14 06:24:50 2012
New Revision: 1384653
URL: http://svn.apache.org/viewvc?rev=1384653&view=rev
Log:
AMBARI-722. Action scheduler implementation.
Modified:
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java Fri Sep 14 06:24:50 2012
@@ -19,9 +19,13 @@ package org.apache.ambari.server.actionm
import java.util.List;
+import org.apache.ambari.server.Role;
+
public class ActionDBAccessor {
+
+ private long stageId = 0;
+
public void persistAction(HostAction ha) {
-
}
public Stage getAction(String actionId) {
@@ -31,4 +35,34 @@ public class ActionDBAccessor {
public List<Stage> getAllStages(String requestId) {
return null;
}
+
+ /**
+ * Returns all the actions that have been queued but not completed yet.
+ * This is used by scheduler to find all pending actions.
+ */
+ public List<Stage> getQueuedStages() {
+ return null;
+ }
+
+ /**
+ * Returns all the actions that have not been queued yet.
+ */
+ public List<Stage> getNotQueuedStages() {
+ return null;
+ }
+
+ /**
+ * Returns next stage id in the sequence, must be persisted.
+ */
+ public synchronized long getNextStageId() {
+ return ++stageId ;
+ }
+
+ public void abortOperation(long requestId) {
+ //Mark all pending or queued actions for this request as aborted.
+ }
+
+ public void timeoutHostRole(long requestId, long stageId, Role role) {
+ // TODO Auto-generated method stub
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Fri Sep 14 06:24:50 2012
@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionm
import java.util.List;
+import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.CommandReport;
@@ -26,6 +27,24 @@ import org.apache.ambari.server.agent.Co
* This class acts as the interface for action manager with other components.
*/
public class ActionManager {
+ private final ActionScheduler scheduler;
+ private final ActionDBAccessor db;
+ private final ActionQueue actionQueue;
+ public ActionManager(long schedulerSleepTime, long actionTimeout, ActionQueue aq) {
+ this.actionQueue = aq;
+ db = new ActionDBAccessor();
+ scheduler = new ActionScheduler(schedulerSleepTime,
+ actionTimeout, db, actionQueue);
+ }
+
+ public void initialize() {
+ scheduler.start();
+ }
+
+ public void shutdown() {
+ scheduler.stop();
+ }
+
public void sendActions(List<Stage> stages) {
//Store all these actions to the db
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Fri Sep 14 06:24:50 2012
@@ -17,27 +17,204 @@
*/
package org.apache.ambari.server.actionmanager;
-//This class encapsulates the action scheduler thread.
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+//This class encapsulates the action scheduler thread.
//Action schedule frequently looks at action database and determines if
//there is an action that can be scheduled.
-public class ActionScheduler implements Runnable {
-
+class ActionScheduler implements Runnable {
+
private final long actionTimeout;
private final long sleepTime;
-
- public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec) {
+ private volatile boolean shouldRun = true;
+ private Thread schedulerThread = null;
+ private final ActionDBAccessor db;
+ private final short maxAttempts = 2;
+ private final ActionQueue actionQueue;
+
+ public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
+ ActionDBAccessor db, ActionQueue actionQueue) {
this.sleepTime = sleepTimeMilliSec;
this.actionTimeout = actionTimeoutMilliSec;
+ this.db = db;
+ this.actionQueue = actionQueue;
+ }
+
+ public void start() {
+ schedulerThread = new Thread(this);
+ schedulerThread.start();
+ }
+
+ public void stop() {
+ shouldRun = false;
+ schedulerThread.interrupt();
}
@Override
public void run() {
- try {
- //Check db for any pending actions and determine if something can be scheduled.
- Thread.sleep(sleepTime);
- } catch (InterruptedException ex) {
- //Shutting down;
+ while (shouldRun) {
+ try {
+ doWork();
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ex) {
+ shouldRun = false;
+ } catch (Exception ex) {
+ //ignore
+ //Log the exception
+ }
+ }
+ }
+
+ private void doWork() {
+ List<Stage> stages = db.getQueuedStages();
+ if (stages == null || stages.isEmpty()) {
+ //Nothing to do
return;
}
+
+ //First discover completions and timeouts.
+ boolean operationFailure = false;
+ for (Stage s : stages) {
+ Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = getInvertedRoleMap(s);
+
+ //Iterate for completion
+ boolean moveToNextStage = true;
+ for (Role r: roleToHrcMap.keySet()) {
+ processPendingsAndReschedule(s, roleToHrcMap.get(r));
+ RoleStatus roleStatus = getRoleStatus(roleToHrcMap.get(r), s.getSuccessFactor(r));
+ if (!roleStatus.isRoleSuccessful()) {
+ if (!roleStatus.isRoleInProgress()) {
+ //The role has completely failed
+ //Mark the entire operation as failed
+ operationFailure = true;
+ break;
+ }
+ moveToNextStage = false;
+ }
+ }
+ if (operationFailure) {
+ db.abortOperation(s.getRequestId());
+ }
+ if (operationFailure || !moveToNextStage) {
+ break;
+ }
+ }
+ }
+
+ private void processPendingsAndReschedule(Stage stage,
+ Map<String, HostRoleCommand> hrcMap) {
+ for (String host : hrcMap.keySet()) {
+ HostRoleCommand hrc = hrcMap.get(host);
+ long now = System.currentTimeMillis();
+ if (now > hrc.getExpiryTime()) {
+ // expired
+ if (now > hrc.getStartTime() + actionTimeout * maxAttempts) {
+ // final expired
+ db.timeoutHostRole(stage.getRequestId(), stage.getStageId(), hrc.getRole());
+ } else {
+ rescheduleHostRole(stage, hrc);
+ }
+ }
+ }
+ }
+
+ private void rescheduleHostRole(Stage s,
+ HostRoleCommand hrc) {
+ long now = System.currentTimeMillis();
+ hrc.setExpiryTime(now);
+ ExecutionCommand cmd = new ExecutionCommand();
+ cmd.setCommandId(s.getActionId());
+ cmd.setManifest(s.getManifest(hrc.getHostName()));
+ actionQueue.enqueue(hrc.getHostName(), cmd);
+ }
+
+ private RoleStatus getRoleStatus(
+ Map<String, HostRoleCommand> hostRoleCmdForRole, float successFactor) {
+ RoleStatus rs = new RoleStatus(hostRoleCmdForRole.size(), successFactor);
+ for (String h : hostRoleCmdForRole.keySet()) {
+ HostRoleCommand hrc = hostRoleCmdForRole.get(h);
+ switch (hrc.getStatus()) {
+ case COMPLETED:
+ rs.numSucceeded++;
+ break;
+ case FAILED:
+ rs.numFailed++;
+ break;
+ case QUEUED:
+ rs.numQueued++;
+ break;
+ case PENDING:
+ rs.numPending++;
+ break;
+ case TIMEDOUT:
+ rs.numTimedOut++;
+ break;
+ case ABORTED:
+ rs.numAborted++;
+ }
+ }
+ return rs;
+ }
+
+ private Map<Role, Map<String, HostRoleCommand>> getInvertedRoleMap(Stage s) {
+ // Temporary to store role to host
+ Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = new TreeMap<Role, Map<String, HostRoleCommand>>();
+ Map<String, HostAction> hostActions = s.getHostActions();
+ for (String h : hostActions.keySet()) {
+ HostAction ha = hostActions.get(h);
+ List<HostRoleCommand> roleCommands = ha.getRoleCommands();
+ for (HostRoleCommand hrc : roleCommands) {
+ Map<String, HostRoleCommand> hrcMap = roleToHrcMap.get(hrc.getRole());
+ if (hrcMap == null) {
+ hrcMap = new TreeMap<String, HostRoleCommand>();
+ roleToHrcMap.put(hrc.getRole(), hrcMap);
+ }
+ hrcMap.put(h, hrc);
+ }
+ }
+ return roleToHrcMap;
+ }
+
+ static class RoleStatus {
+ int numQueued = 0;
+ int numSucceeded = 0;
+ int numFailed = 0;
+ int numTimedOut = 0;
+ int numPending = 0;
+ int numAborted = 0;
+ final int totalHosts;
+ final float successFactor;
+
+ RoleStatus(int total, float successFactor) {
+ this.totalHosts = total;
+ this.successFactor = successFactor;
+ }
+
+ boolean isRoleSuccessful() {
+ if (successFactor <= (1.0*numSucceeded)/totalHosts) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ boolean isRoleInProgress() {
+ return (numPending+numQueued > 0);
+ }
+
+ boolean isRoleFailed() {
+ if ((!isRoleInProgress()) && (!isRoleSuccessful())) {
+ return false;
+ } else {
+ return true;
+ }
+ }
}
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java Fri Sep 14 06:24:50 2012
@@ -29,4 +29,15 @@ public class HostAction {
public HostAction(String host) {
this.host = host;
}
+
+ public List<HostRoleCommand> getRoleCommands() {
+ return roles;
+ }
+
+ public String getManifest() {
+ if (manifest == null) {
+ //generate manifest
+ }
+ return manifest;
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java Fri Sep 14 06:24:50 2012
@@ -30,15 +30,39 @@ import org.apache.ambari.server.RoleComm
public class HostRoleCommand {
private final Role role;
private Map<String, String> params = null;
- private final float successFactor;
private HostRoleStatus status = HostRoleStatus.PENDING;
private final RoleCommand cmd;
- //private HostRoleFSMEvent event;
+ private long startTime;
+ private long expiryTime;
+ private final String host;
- public HostRoleCommand(String host, Role role, float successFactor,
- RoleCommand cmd) {
+ public HostRoleCommand(String host, Role role, RoleCommand cmd) {
+ this.host = host;
this.role = role;
- this.successFactor = successFactor;
this.cmd = cmd;
}
+
+ public Role getRole() {
+ return role;
+ }
+
+ public HostRoleStatus getStatus() {
+ return status;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getExpiryTime() {
+ return expiryTime;
+ }
+
+ public void setExpiryTime(long t) {
+ expiryTime = t;
+ }
+
+ public String getHostName() {
+ return this.host;
+ }
}
Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java Fri Sep 14 06:24:50 2012
@@ -17,16 +17,77 @@
*/
package org.apache.ambari.server.actionmanager;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.server.Role;
//This class encapsulates the stage. The stage encapsulates all the information
//required to persist an action.
public class Stage {
- private long requestId;
+ private final long requestId;
private long stageId = -1;
+
+ //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+ private Map<Role, Float> successFactors = new HashMap<Role, Float>();
//Map of host to host-roles
- private Map<String, HostAction> action;
- private String logDir;
+ private Map<String, HostAction> hostActions = new TreeMap<String, HostAction>();
+ private final String logDir;
+
+ public Stage(long requestId, String logDir) {
+ this.requestId = requestId;
+ this.logDir = logDir;
+ }
+
+ public synchronized void setStageId(long stageId) {
+ if (this.stageId != -1) {
+ throw new RuntimeException("Attempt to set stageId again! Not allowed.");
+ }
+ this.stageId = stageId;
+ }
+
+ public synchronized long getStageId() {
+ return stageId;
+ }
+
+ public String getActionId() {
+ return "" + requestId + "-" + stageId;
+ }
+
+ synchronized void addHostAction(String host, HostAction ha) {
+ hostActions.put(host, ha);
+ }
+
+ synchronized HostAction getHostAction(String host) {
+ return hostActions.get(host);
+ }
+
+ /**
+ * Returns an internal data structure, please don't modify it.
+ * TODO: Ideally should return an iterator.
+ */
+ synchronized Map<String, HostAction> getHostActions() {
+ return hostActions;
+ }
+
+ synchronized float getSuccessFactor(Role r) {
+ Float f = successFactors.get(r);
+ if (f == null) {
+ return 1;
+ } else {
+ return f;
+ }
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getManifest(String hostName) {
+ // TODO Auto-generated method stub
+ return getHostAction(hostName).getManifest();
+ }
}