You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ji...@apache.org on 2012/09/14 08:24:50 UTC

svn commit: r1384653 - in /incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager: ActionDBAccessor.java ActionManager.java ActionScheduler.java HostAction.java HostRoleCommand.java Stage.java

Author: jitendra
Date: Fri Sep 14 06:24:50 2012
New Revision: 1384653

URL: http://svn.apache.org/viewvc?rev=1384653&view=rev
Log:
AMBARI-722. Action scheduler implementation.

Modified:
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
    incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java Fri Sep 14 06:24:50 2012
@@ -19,9 +19,13 @@ package org.apache.ambari.server.actionm
 
 import java.util.List;
 
+import org.apache.ambari.server.Role;
+
 public class ActionDBAccessor {
+  
+  private long stageId = 0;
+  
   public void persistAction(HostAction ha) {
-
   }
 
   public Stage getAction(String actionId) {
@@ -31,4 +35,34 @@ public class ActionDBAccessor {
   public List<Stage> getAllStages(String requestId) {
     return null;
   }
+  
+  /**
+   * Returns all the actions that have been queued but not completed yet.
+   * This is used by scheduler to find all pending actions.
+   */
+  public List<Stage> getQueuedStages() {
+    return null;
+  }
+  
+  /**
+   * Returns all the actions that have not been queued yet.
+   */
+  public List<Stage> getNotQueuedStages() {
+    return null;
+  }
+  
+  /**
+   * Returns next stage id in the sequence, must be persisted.
+   */
+  public synchronized long getNextStageId() {
+    return ++stageId ;
+  }
+
+  public void abortOperation(long requestId) {
+    //Mark all pending or queued actions for this request as aborted.
+  }
+
+  public void timeoutHostRole(long requestId, long stageId, Role role) {
+    // TODO Auto-generated method stub
+  }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java Fri Sep 14 06:24:50 2012
@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionm
 
 import java.util.List;
 
+import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
 
 
@@ -26,6 +27,24 @@ import org.apache.ambari.server.agent.Co
  * This class acts as the interface for action manager with other components.
  */
 public class ActionManager {
+  private final ActionScheduler scheduler;
+  private final ActionDBAccessor db;
+  private final ActionQueue actionQueue;
+  public ActionManager(long schedulerSleepTime, long actionTimeout, ActionQueue aq) {
+    this.actionQueue = aq;
+    db = new ActionDBAccessor();
+    scheduler = new ActionScheduler(schedulerSleepTime,
+        actionTimeout, db, actionQueue);
+  }
+  
+  public void initialize() {
+    scheduler.start();
+  }
+  
+  public void shutdown() {
+    scheduler.stop();
+  }
+  
   public void sendActions(List<Stage> stages) {
     //Store all these actions to the db
   }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java Fri Sep 14 06:24:50 2012
@@ -17,27 +17,204 @@
  */
 package org.apache.ambari.server.actionmanager;
 
-//This class encapsulates the action scheduler thread.
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+//This class encapsulates the action scheduler thread. 
 //Action schedule frequently looks at action database and determines if
 //there is an action that can be scheduled.
-public class ActionScheduler implements Runnable {
-
+class ActionScheduler implements Runnable {
+  
   private final long actionTimeout;
   private final long sleepTime;
-
-  public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec) {
+  private volatile boolean shouldRun = true;
+  private Thread schedulerThread = null;
+  private final ActionDBAccessor db;
+  private final short maxAttempts = 2;
+  private final ActionQueue actionQueue;
+  
+  public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
+      ActionDBAccessor db, ActionQueue actionQueue) {
     this.sleepTime = sleepTimeMilliSec;
     this.actionTimeout = actionTimeoutMilliSec;
+    this.db = db;
+    this.actionQueue = actionQueue;
+  }
+  
+  public void start() {
+    schedulerThread = new Thread(this);
+    schedulerThread.start();
+  }
+  
+  public void stop() {
+    shouldRun = false;
+    schedulerThread.interrupt();
   }
 
   @Override
   public void run() {
-    try {
-      //Check db for any pending actions and determine if something can be scheduled.
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException ex) {
-      //Shutting down;
+    while (shouldRun) {
+      try {
+        doWork();
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException ex) {
+        shouldRun = false;
+      } catch (Exception ex) {
+        //ignore
+        //Log the exception
+      }
+    }
+  }
+  
+  private void doWork() {
+    List<Stage> stages = db.getQueuedStages();
+    if (stages == null || stages.isEmpty()) {
+      //Nothing to do
       return;
     }
+    
+    //First discover completions and timeouts.
+    boolean operationFailure = false;
+    for (Stage s : stages) {
+      Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = getInvertedRoleMap(s);
+      
+      //Iterate for completion
+      boolean moveToNextStage = true;
+      for (Role r: roleToHrcMap.keySet()) {
+        processPendingsAndReschedule(s, roleToHrcMap.get(r));
+        RoleStatus roleStatus = getRoleStatus(roleToHrcMap.get(r), s.getSuccessFactor(r));
+        if (!roleStatus.isRoleSuccessful()) {
+          if (!roleStatus.isRoleInProgress()) {
+            //The role has completely failed
+            //Mark the entire operation as failed
+            operationFailure = true;
+            break;
+          }
+          moveToNextStage = false;
+        }
+      }
+      if (operationFailure) {
+        db.abortOperation(s.getRequestId());
+      }
+      if (operationFailure || !moveToNextStage) {
+        break;
+      }
+    }
+  }
+
+  private void processPendingsAndReschedule(Stage stage,
+      Map<String, HostRoleCommand> hrcMap) {
+    for (String host : hrcMap.keySet()) {
+      HostRoleCommand hrc = hrcMap.get(host);
+      long now = System.currentTimeMillis();
+      if (now > hrc.getExpiryTime()) {
+        // expired
+        if (now > hrc.getStartTime() + actionTimeout * maxAttempts) {
+          // final expired
+          db.timeoutHostRole(stage.getRequestId(), stage.getStageId(), hrc.getRole());
+        } else {
+          rescheduleHostRole(stage, hrc);
+        }
+      }
+    }
+  }
+
+  private void rescheduleHostRole(Stage s,
+      HostRoleCommand hrc) {
+    long now = System.currentTimeMillis();
+    hrc.setExpiryTime(now);
+    ExecutionCommand cmd = new ExecutionCommand();
+    cmd.setCommandId(s.getActionId());
+    cmd.setManifest(s.getManifest(hrc.getHostName()));
+    actionQueue.enqueue(hrc.getHostName(), cmd);
+  }
+
+  private RoleStatus getRoleStatus(
+      Map<String, HostRoleCommand> hostRoleCmdForRole, float successFactor) {
+    RoleStatus rs = new RoleStatus(hostRoleCmdForRole.size(), successFactor);
+    for (String h : hostRoleCmdForRole.keySet()) {
+      HostRoleCommand hrc = hostRoleCmdForRole.get(h);
+      switch (hrc.getStatus()) {
+      case COMPLETED:
+        rs.numSucceeded++;
+        break;
+      case FAILED:
+        rs.numFailed++;
+        break;
+      case QUEUED:
+        rs.numQueued++;
+        break;
+      case PENDING:
+        rs.numPending++;
+        break;
+      case TIMEDOUT:
+        rs.numTimedOut++;
+        break;
+      case ABORTED:
+        rs.numAborted++;
+      }
+    }
+    return rs;
+  }
+
+  private Map<Role, Map<String, HostRoleCommand>> getInvertedRoleMap(Stage s) {
+    // Temporary to store role to host
+    Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = new TreeMap<Role, Map<String, HostRoleCommand>>();
+    Map<String, HostAction> hostActions = s.getHostActions();
+    for (String h : hostActions.keySet()) {
+      HostAction ha = hostActions.get(h);
+      List<HostRoleCommand> roleCommands = ha.getRoleCommands();
+      for (HostRoleCommand hrc : roleCommands) {
+        Map<String, HostRoleCommand> hrcMap = roleToHrcMap.get(hrc.getRole());
+        if (hrcMap == null) {
+          hrcMap = new TreeMap<String, HostRoleCommand>();
+          roleToHrcMap.put(hrc.getRole(), hrcMap);
+        }
+        hrcMap.put(h, hrc);
+      }
+    }
+    return roleToHrcMap;
+  }
+  
+  static class RoleStatus {
+    int numQueued = 0;
+    int numSucceeded = 0;
+    int numFailed = 0;
+    int numTimedOut = 0;
+    int numPending = 0;
+    int numAborted = 0;
+    final int totalHosts;
+    final float successFactor;
+    
+    RoleStatus(int total, float successFactor) {
+      this.totalHosts = total;
+      this.successFactor = successFactor;
+    }
+    
+    boolean isRoleSuccessful() {
+      if (successFactor <= (1.0*numSucceeded)/totalHosts) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+    
+    boolean isRoleInProgress() {
+      return (numPending+numQueued > 0);
+    }
+    
+    boolean isRoleFailed() {
+      if ((!isRoleInProgress()) && (!isRoleSuccessful())) {
+        return false;
+      } else {
+        return true;
+      }
+    }
   }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java Fri Sep 14 06:24:50 2012
@@ -29,4 +29,15 @@ public class HostAction {
   public HostAction(String host) {
     this.host = host;
   }
+  
+  public List<HostRoleCommand> getRoleCommands() {
+    return roles;
+  }
+
+  public String getManifest() {
+    if (manifest == null) {
+      //generate manifest
+    }
+    return manifest;
+  }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java Fri Sep 14 06:24:50 2012
@@ -30,15 +30,39 @@ import org.apache.ambari.server.RoleComm
 public class HostRoleCommand {
   private final Role role;
   private Map<String, String> params = null;
-  private final float successFactor;
   private HostRoleStatus status = HostRoleStatus.PENDING;
   private final RoleCommand cmd;
-  //private HostRoleFSMEvent event;
+  private long startTime;
+  private long expiryTime;
+  private final String host;
 
-  public HostRoleCommand(String host, Role role, float successFactor,
-      RoleCommand cmd) {
+  public HostRoleCommand(String host, Role role, RoleCommand cmd) {
+    this.host = host;
     this.role = role;
-    this.successFactor = successFactor;
     this.cmd = cmd;
   }
+
+  public Role getRole() {
+    return role;
+  }
+
+  public HostRoleStatus getStatus() {
+    return status;
+  }
+  
+  public long getStartTime() {
+    return startTime;
+  }
+  
+  public long getExpiryTime() {
+    return expiryTime;
+  }
+  
+  public void setExpiryTime(long t) {
+    expiryTime = t;
+  }
+
+  public String getHostName() {
+    return this.host;
+  }
 }

Modified: incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java?rev=1384653&r1=1384652&r2=1384653&view=diff
==============================================================================
--- incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java (original)
+++ incubator/ambari/branches/AMBARI-666/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java Fri Sep 14 06:24:50 2012
@@ -17,16 +17,77 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.server.Role;
 
 //This class encapsulates the stage. The stage encapsulates all the information
 //required to persist an action.
 public class Stage {
-  private long requestId;
+  private final long requestId;
   private long stageId = -1;
+  
+  //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
+  private Map<Role, Float> successFactors = new HashMap<Role, Float>();
 
   //Map of host to host-roles
-  private Map<String, HostAction> action;
-  private String logDir;
+  private Map<String, HostAction> hostActions = new TreeMap<String, HostAction>();
+  private final String logDir;
+  
+  public Stage(long requestId, String logDir) {
+    this.requestId = requestId;
+    this.logDir = logDir;
+  }
+  
+  public synchronized void setStageId(long stageId) {
+    if (this.stageId != -1) {
+      throw new RuntimeException("Attempt to set stageId again! Not allowed.");
+    }
+    this.stageId = stageId;
+  }
+  
+  public synchronized long getStageId() {
+    return stageId;
+  }
+  
+  public String getActionId() {
+    return "" + requestId + "-" + stageId;
+  }
+  
+  synchronized void addHostAction(String host, HostAction ha) {
+    hostActions.put(host, ha);
+  }
+  
+  synchronized HostAction getHostAction(String host) {
+    return hostActions.get(host);
+  }
+  
+  /**
+   * Returns an internal data structure, please don't modify it.
+   * TODO: Ideally should return an iterator.
+   */
+  synchronized Map<String, HostAction> getHostActions() {
+    return hostActions;
+  }
+  
+  synchronized float getSuccessFactor(Role r) {
+    Float f = successFactors.get(r);
+    if (f == null) {
+      return 1;
+    } else {
+      return f;
+    }
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public String getManifest(String hostName) {
+    // TODO Auto-generated method stub
+    return getHostAction(hostName).getManifest();
+  }
 }